回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-02 Thread Zhijiang(wangzhijiang999)
Hi Gerard,

I think you can check the job manager log to find which task failed at first, 
and then trace the task manager log containing the failed task to find the 
initial reason.
The failed task will trigger canceling all the other tasks, and during 
canceling process, the blocked task that is waiting for output buffer can not 
be interrupted by the
canceler thread which is shown in your description. So I think the cancel 
process is not the key point and is in expectation. Maybe it did not cause OOM 
at all. 
If the taskduring canceling, the task manager process will be exited finally to 
trigger restarting the job.

Zhijiang
--
发件人:Gerard Garcia 
发送时间:2018年7月2日(星期一) 18:29
收件人:wangzhijiang999 
抄 送:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces belong 
to the unresponsive task, that is why we suspect that at some point it did not 
have enough memory to serialize the message and it blocked. I've also found 
that when it hanged several output buffers were full (see attached image 
buffers.outPoolUsage.png) so I guess the traces just reflect that.

Probably the task hanged for some other reason and that is what filled the 
output buffers previous to the blocked operator. I'll have to continue 
investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) 
 wrote:
 Hi Gerard,

From the below stack, it can only indicate the task is canceled that may be 
triggered by job manager becuase of other task failure. If the task can not be 
interrupted within timeout config, the task managerprocess will be exited. Do 
you see any OutOfMemory messages from the task manager log?  Normally the ouput 
serialization buffer is managed by task manager framework and will not cause 
OOM, and on the input desearialization side, there will be a temp bytes array 
on each channel for holding partial records which is not managed by framework. 
I think you can confirm whether and where caused the OOM. Maybe check the task 
failure logs.

Zhijiang

--
发件人:gerardg 
发送时间:2018年6月30日(星期六) 00:12
收件人:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any 
kind of log error while other tasks running in the same task manager continue 
without problems. When these tasks are restarted the task manager gets killed 
and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) 
java.nio.ByteBuffer.wrap(ByteBuffer.java:396) 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 scala.collection.immutable.List.foreach(List.scala:392) 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 

MIs-reported metrics using SideOutput stream + Broadcast

2018-07-02 Thread Cliff Resnick
Our topology has a metadata source that we push via Broadcast. Because this
metadata source is critical, but sometimes late, we added a buffering
mechanism via a SideOutput. We call the initial look-up from Broadcast
"join"  and the secondary, state-backed buffered  lookup, "late-join"

Today I noticed that if we implement the late join using a
KeyedBroadcastProcessFunction, (so we can set TTL timers while using
broadcast) everything seems to work. However, even though our
internal metrics show the correct numbers, the numbers in the Flink UI
falsely indicates that:

1) No broadcast data is sent to the late join, meaning Flink metrics
for the metadata operator does not indicate any extra records sent.
2) Primary Join's main stream (not Side Output) is indicated as being sent
to Late Join, meaning the Flink metrics input record number from Primary
Join matches Late Join's, even though our logs and internal metrics might
show zero traffic.

If I do the late join via CoProcessFunction using a metadata keyed stream
instead of broadcast, then the Flink UI shows the correct numbers
(unfortunately there is another side issue when we take that tack but I
won't go into that here).

I hope this was not too confusing. Again the issue is not that this does
not work -- it just looks like it does not work in the Flink UI.

Below is the approximate code. Perhaps I'm doing something wrong that
causes the weird reporting?

val metadata = MetadataTable
  .streamFromKafka(env)

val broadcast = createBroadcast(metadata)

val metadataJoined = sourceTables
.union(source1Tables)
.union(source2Tables)
.connect(broadcast)
.process(BroadcastMetadataJoin()) // this operator will send side output
data using Metadata.sideOutputTag

  .name("join")


val lateJoined = metadataJoined
  .getSideOutput(Metadata.sideOutputTag)
  .keyBy(_.primaryKey.getMetadataId)
  .connect(KeyedBroadcastMetadataJoin.broadcast(metadata))
  .process(KeyedBroadcastMetadataJoin())
  .name("late-join")


Re: Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.

2018-07-02 Thread Hequn Cheng
Hi Mich,

It seems the writeMode has not been set correctly. Have you ever tried

> .writeAsText("/tmp/md_streaming.txt", FileSystem.WriteMode.OVERWRITE);


On Mon, Jul 2, 2018 at 10:44 PM, Mich Talebzadeh 
wrote:

> Flink 1.5
>
> This streaming data written to a file
>
> val stream = env
>  .addSource(new FlinkKafkaConsumer09[String]("md", new
> SimpleStringSchema(), properties))
>  .writeAsText("/tmp/md_streaming.txt")
>  env.execute("Flink Kafka Example")
>
> The error states
>
> Caused by: java.io.IOException: File or directory /tmp/md_streaming.txt
> already exists. Existing files and directories are not overwritten in
> NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and
> directories.
>
> Is there any append in writeAsText? I tried OVERWRITE but  did not work.
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Description of Flink event time processing

2018-07-02 Thread Elias Levy
The documentation of how Flink handles event time and watermarks is spread
across several places.  I've been wanting a single location that summarizes
the subject, and as none was available, I wrote one up.

You can find it here:
https://docs.google.com/document/d/1b5d-hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing

I'd appreciate feedback, particularly about the correctness of the
described behavior.


Kafka Avro Table Source

2018-07-02 Thread Will Du
Hi folks,
I am working on using avro table source mapping to kafka source. By looking at 
the example, I think the current Flink v1.5.0 connector is not flexible enough. 
I wonder if I have to specify the avro record class to read from Kafka.

For withSchema, the schema can get from schema registry. However, the class of 
avro seems hard coded.

thanks,
Will

KafkaTableSource source = Kafka010AvroTableSource.builder()
  // set Kafka topic
  .forTopic("sensors")
  // set Kafka consumer properties
  .withKafkaProperties(kafkaProps)
  // set Table schema
  .withSchema(TableSchema.builder()
.field("sensorId", Types.LONG())
.field("temp", Types.DOUBLE())
.field("time", Types.SQL_TIMESTAMP()).build())
  // set class of Avro record
  .forAvroRecordClass(SensorReading.class)  // ? Any way to get this without 
hard code class
  .build();



Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign

2018-07-02 Thread Mich Talebzadeh
This is becoming very tedious.

As suggested I changed the kafka dependency from

ibraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"
to

libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

and compiled and ran the job again anf failed. This is the log file

2018-07-02 21:38:38,656 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
version : 1.1.0
2018-07-02 21:38:38,656 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
commitId : fdcf75ea326b8e07
2018-07-02 21:38:38,696 INFO
org.apache.kafka.clients.Metadata - Cluster ID:
3SqEt4DcTruOr_SlQ6fqTQ
2018-07-02 21:38:38,698 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
Consumer subtask 0 will start reading the following 1 partitions from the
committed group offsets in Kafka: [KafkaTopicPartition{topic='md',
partition=0}]
2018-07-02 21:38:38,702 INFO
org.apache.kafka.clients.consumer.ConsumerConfig  -
ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [rhes75:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = md_streaming
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 1
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
2018-07-02 21:38:38,705 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
version : 1.1.0
2018-07-02 21:38:38,705 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
commitId : fdcf75ea326b8e07
2018-07-02 21:38:38,705 WARN
org.apache.kafka.common.utils.AppInfoParser   - Error
registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=consumer-2
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:785)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:624)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482)
at

Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign

2018-07-02 Thread Mich Talebzadeh
Hi,

This is the code

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

object md_streaming
{
  def main(args: Array[String])
  {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "rhes75:9092")
properties.setProperty("zookeeper.connect", "rhes75:2181")
properties.setProperty("group.id", "md_streaming")
val stream = env
 .addSource(new FlinkKafkaConsumer09[String]("md", new
SimpleStringSchema(), properties))
 .writeAsText("/tmp/md_streaming.txt")
env.execute("Flink Kafka Example")
  }

and this is the sbt dependencies

libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
"1.5.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"


Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 2 Jul 2018 at 17:45, Ted Yu  wrote:

> Here is the signature of assign :
>
> public void assign(Collection partitions) {
>
> Looks like RestClusterClient was built against one version of Kafka but
> runs against a different version.
>
> Please check the sbt dependency and the version of Kafka jar on the
> classpath.
>
> Thanks
>
> On Mon, Jul 2, 2018 at 9:35 AM, Mich Talebzadeh  > wrote:
>
>> Have you seen this error by any chance in flink streaming with Kafka
>> please?
>>
>> org.apache.flink.client.program.ProgramInvocationException:
>> java.lang.NoSuchMethodError:
>> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>> at md_streaming$.main(md_streaming.scala:30)
>> at md_streaming.main(md_streaming.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>> at
>> 

Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign

2018-07-02 Thread Jörn Franke
Looks like a version issue , have you made sure that the Kafka version is 
compatible?

> On 2. Jul 2018, at 18:35, Mich Talebzadeh  wrote:
> 
> Have you seen this error by any chance in flink streaming with Kafka  please?
> 
> org.apache.flink.client.program.ProgramInvocationException: 
> java.lang.NoSuchMethodError: 
> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at md_streaming$.main(md_streaming.scala:30)
> at md_streaming.main(md_streaming.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> Caused by: java.lang.NoSuchMethodError: 
> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)
> 
> thanks
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Regarding external metastore like HIVE

2018-07-02 Thread Shivam Sharma
Hi,

I have read Flink documentation that Flink supports Metastore which is
currently InMemory. Is Flink community thinking to implement external
Metastore like Hive?

Thanks

-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
*


java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign

2018-07-02 Thread Mich Talebzadeh
Have you seen this error by any chance in flink streaming with Kafka
please?

org.apache.flink.client.program.ProgramInvocationException:
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at md_streaming$.main(md_streaming.scala:30)
at md_streaming.main(md_streaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)


thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: The program didn't contain a Flink job

2018-07-02 Thread Rong Rong
Hmm. That's strange.

Can you explain a little more on how your YARN cluster is set up and how
you configure the submission context?
Also, did you try submitting the jobs in detach mode?

Is this happening from time to time for one specific job graph? Or it is
consistently throwing the exception for the same job?

--
Rong



On Mon, Jul 2, 2018 at 7:57 AM eSKa  wrote:

> No.
> execute was called, and all calculation succeeded - there were job on
> dashboard with status FINISHED.
> after execute we had our logs that were claiming that everything succeded.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: The program didn't contain a Flink job

2018-07-02 Thread eSKa
No. 
execute was called, and all calculation succeeded - there were job on
dashboard with status FINISHED.
after execute we had our logs that were claiming that everything succeded.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The program didn't contain a Flink job

2018-07-02 Thread Rong Rong
Did you forget to call executionEnvironment.execute() after you define your
Flink job?

--
Rong

On Mon, Jul 2, 2018 at 1:42 AM eSKa  wrote:

> Hello, We are currently running jobs on Flink 1.4.2. Our usecase is as
> follows:
> -service get request from customer
> - we submit job to flink using YarnClusterClient
> Sometimes we have up to 6 jobs at the same time.
>
> From time to time we got error as below:
> The program didn't contain a Flink job.
> org.apache.flink.client.program.ProgramMissingJobException: The program
> didn't contain a Flink job.
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:398)
>
> From logs we can see that main method from job is returning correct
> status, but for some reason later Flink throws that exception anyway. Do
> you know what could be a case here and how to prevent it from happening?
> --
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: Memory Leak in ProcessingTimeSessionWindow

2018-07-02 Thread ashish pok
 All,
I have been doing a little digging on this and to Stefan's point incrementing 
memory (not necessarily leak) was essentially because of keys that were 
incrementing as I was using time buckets concatenated with actual key to make 
unique sessions.
Taking a couple of steps back, use case is very simple tumbling window of 15 
mins by keys. Stream can be viewed simply as:
||
We have a few of these type of pipelines and one catch here is we wanted to 
create an app which can process historical and current data. HIstorical data is 
mainly because users adhoc request for "backfill". In order to easily manage 
processing pipeline, we are making no distinction between historical and 
current data as processing is based on event time. 
1) Of course, easiest way to solve this problem is to create TumblingWindow of 
15mins with some allowed lateness. One issue here was watermarks are moved 
forward and backfill data appeared to be viewed as late arrival data, which is 
a correct behavior from Flink perspective but seems to be causing issues in how 
we are trying to handle streams.
2) Another issue is our data collectors are highly distributed - we regularly 
get data from later event time buckets faster than older buckets. Also, it is 
also more consistent to actually create 15min buckets using concept of Session 
instead. So I am creating a key with | and a 
session gap of say 10 mins. This works perfectly from business logic 
perspective. However, now I am introducing quite a lot of keys which based on 
my heap dumps seem to be hanging around causing memory issues.
3) We converted the apps to a Process function and manage all states using key 
defined in step (2) and registering/unregistering timeouts. 
Solution (3) seems to be working pretty stable from memory perspective. 
However, it just feels like with so much high-level APIs available, we are not 
using them properly and keep reverting back to low level Process APIs - in the 
last month we have migrated about 5 or 6 apps to Process now :) 
For solution (2) it feels like any other Session aggregation use case will have 
the issue of keys hanging around (eg: for click streams with user sessions 
etc). Isn't there a way to clear those session windows? Sorry, I just feel like 
we are missing something simple and have been reverting to low level APIs 
instead.
Thanks,

On Friday, June 22, 2018, 9:00:14 AM EDT, ashish pok  
wrote:  
 
 Stefan, All, 
If there are no further thoughts on this I am going to switch my app to low 
level Process API. I still think there is an easier solution here which I am 
missing but I will revisit that after I fix Production issue.
Thanks, Ashish



On Thursday, June 21, 2018, 7:28 AM, ashish pok  wrote:

Hi Stefan, 
Thanks for outlining the steps and are similar to what we have been doing for 
OOM issues.
However, I was looking for something more high level on whether state / key 
handling needs some sort of cleanup specifically that is not done by default. I 
am about 99% (nothing is certain:)) sure that if I switch this app to a lower 
lever API like Process Function and manage my own state and timers, I will not 
see this issue. When I had same issue in the past it was for Global Window and 
Fabian point d out that new keys are constantly being created. I built a simple 
Process Function for that and issue went away. I think your first statement 
sort of hints that as well. So let me hone in on that. I am processing a time 
series data for network elements. Keys are 10 mins floor of event time concat 
with element ID. Idea here was to create 10 min buckets of data with windows 
that start with first event in that bucket and fire when no events arrive for 
12 or so minutes.So new keys are definitely being created. So,
1- Am I adding to the memory constantly by doing that? Sounds like it based on 
your comments.2- If so, whats the way to clear those keys when windows fire if 
any?3- It seems like a very simple use case, so now I am wondering if I am even 
using the right high level API?
Thanks, Ashish


Sent from Yahoo Mail for iPhone


On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter 
 wrote:

Hi,
it is possible that the number of processing time timers can grow, because 
internal timers are scoped by time, key, and namespace (typically this means 
„window“, because each key can be part of multiple windows). So if the number 
of keys in your application is steadily growing this can happen. 
To analyse the heap dump, I usually take the following approach:- obviously 
include only reachable objects. If dumps are very big, try limit the size or to 
trigger the OOM earlier by configuring a lower heap size. It should still give 
you the problematic object accumulation, if there is one.- like at the 
statistics of „heavy hitter“ classes, i.e. classes for which the instances 
contribute the most to the overall heap consumption. Sometimes this will show 
you classes that are also part of classes that rank higher up, e.g. 

Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.

2018-07-02 Thread Mich Talebzadeh
Flink 1.5

This streaming data written to a file

val stream = env
 .addSource(new FlinkKafkaConsumer09[String]("md", new
SimpleStringSchema(), properties))
 .writeAsText("/tmp/md_streaming.txt")
 env.execute("Flink Kafka Example")

The error states

Caused by: java.io.IOException: File or directory /tmp/md_streaming.txt
already exists. Existing files and directories are not overwritten in
NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and
directories.

Is there any append in writeAsText? I tried OVERWRITE but  did not work.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: How to partition within same physical node in Flink

2018-07-02 Thread ashish pok
 Thanks Fabian! It sounds like KeyGroup will do the trick if that can be made 
publicly accessible.
On Monday, July 2, 2018, 5:43:33 AM EDT, Fabian Hueske  
wrote:  
 
 Hi Ashish, hi Vijay,
Flink does not distinguish between different parts of a key (parent, child) in 
the public APIs. However, there is an internal concept of KeyGroups which is 
similar to what you call physical partitioning. A KeyGroup is a group of keys 
that are always processed on the same physical node. The motivation for this 
feature is operator scaling because all keys of a group are always processed by 
the same node and hence their state is always distributed together. However, 
AFAIK, KeyGroups are not exposed to the user API. Moreover, KeyGroups are 
distributed to slots, i.e., each KeyGroup is processed by a single slot, but 
each slot might processes multiple key groups. This distribution is done with 
hash partitioning and hence hard to tune.

There might be a way to tweak this by implementing an own low-level operator 
but I'm not sure. Stefan (in CC) might be able  to give some hints.
Best, Fabian

2018-06-29 18:35 GMT+02:00 Vijay Balakrishnan :

Thanks for the clarification, Fabian.This is what I compromised on for my 
use-case-doesn't exactly do what I intended to do.Partition by a key, and then 
spawn threads inside that partition to do my task and then finally repartition 
again(for a subsequent connect).
DataStream keyedByCamCameraStream = env            
.addSource(new Source())            .keyBy((cameraWithCube) -> 
cameraWithCube.getCam() );AsyncFunction 
cameraWithCubeAsyncFunction =                new SampleAsyncFunction(, 
nThreads);//spawn threads here with the second key ts here        
DataStream cameraWithCubeDataStreamAsync =                
AsyncDataStream.orderedWait( keyedByCamCameraStream, 
cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nThreads)          
              .setParallelism( parallelCamTasks);//capacity= max # of inflight 
requests - how much; timeout - max time until considered failed                 
               DataStream cameraWithCubeDataStream = 
cameraWithCubeDataStreamAsync. keyBy((cameraWithCube) -> 
cameraWithCube.getTs());  

On Thu, Jun 28, 2018 at 9:22 AM ashish pok  wrote:

Fabian, All,
Along this same line, we have a datasource where we have parent key and child 
key. We need to first keyBy parent and then by child. If we want to have 
physical partitioning in a way where physical partiotioning happens first by 
parent key and localize grouping by child key, is there a need to using custom 
partitioner? Obviously we can keyBy twice but was wondering if we can minimize 
the re-partition stress.
Thanks,
Ashish


- Ashish

On Thursday, June 28, 2018, 9:02 AM, Fabian Hueske  wrote:

Hi Vijay,
Flink does not provide fine-grained control to place keys to certain slots or 
machines. 
When specifying a key, it is up to Flink (i.e., its internal hash function) 
where the data is processed. This works well for large key spaces, but can be 
difficult if you have only a few keys.
So, even if you keyBy(cam) and handle the parallelization of seq# internally 
(which I would not recommend), it might still happen that the data of two 
cameras is processed on the same slot.The only way to change that would be to 
fiddle with the hash of your keys, but this might give you a completely 
different distribution when scaling out the application at a later point in 
time.
Best, Fabian


2018-06-26 19:54 GMT+02:00 Vijay Balakrishnan :



Hi Fabian,Thanks once again for your reply. I need to get the data from each 
cam/camera into 1 partition/slot and not move the gigantic video data around as 
much as I perform other operations on it. For eg, I can get seq#1 and seq#2 for 
cam1 in cam1 partition/slot and then combine, split,parse, stitch etc. 
operations on it in multiple threads within the same cam1 partition.
I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in 1 
partition(eg: cam1). The idea is to then work within the cam1 partition with 
various seq#'s 1,2 etc on various threads within the same slot/partition of 
TaskManager.
The data is stored in EFS keyed based on seq#/cam# folder structure.
Our actual problem is managing network bandwidth as a resource in each 
partition. We want to make sure that the processing of 1 camera(split into 
multiple seq# tasks) is not running on the same node as the processing of 
another camera as in that case, the required network bandwidth for storing the 
output of the process running in the partition would exceed the network 
bandwidth of the hardware. Camera processing is expected to run on the same 
hardware as the video decode step which is an earlier sequential process in the 
same Dataflow pipeline.
I guess I might have to use a ThreadPool within each Slot(cam partition) to 
work on each seq# ??
TIA



On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske  wrote:





Hi,
keyBy() does not work hierarchically. 

Re: How to partition within same physical node in Flink

2018-07-02 Thread Fabian Hueske
Hi Ashish, hi Vijay,

Flink does not distinguish between different parts of a key (parent, child)
in the public APIs. However, there is an internal concept of KeyGroups
which is similar to what you call physical partitioning. A KeyGroup is a
group of keys that are always processed on the same physical node. The
motivation for this feature is operator scaling because all keys of a group
are always processed by the same node and hence their state is always
distributed together. However, AFAIK, KeyGroups are not exposed to the user
API. Moreover, KeyGroups are distributed to slots, i.e., each KeyGroup is
processed by a single slot, but each slot might processes multiple key
groups. This distribution is done with hash partitioning and hence hard to
tune.

There might be a way to tweak this by implementing an own low-level
operator but I'm not sure. Stefan (in CC) might be able  to give some hints.

Best, Fabian

2018-06-29 18:35 GMT+02:00 Vijay Balakrishnan :

> Thanks for the clarification, Fabian.
> This is what I compromised on for my use-case-doesn't exactly do what I
> intended to do.
> Partition by a key, and then spawn threads inside that partition to do my
> task and then finally repartition again(for a subsequent connect).
>
> DataStream keyedByCamCameraStream = env
> .addSource(new Source())
> .keyBy((cameraWithCube) -> cameraWithCube.getCam() );
> AsyncFunction cameraWithCubeAsyncFunction =
> new SampleAsyncFunction(, nThreads);//spawn threads
> here with the second key ts here
> DataStream cameraWithCubeDataStreamAsync =
> AsyncDataStream.orderedWait(keyedByCamCameraStream,
> cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nThreads)
> .setParallelism(parallelCamTasks);//capacity=max
> # of inflight requests - how much; timeout - max time until considered
> failed
>
> DataStream cameraWithCubeDataStream =
> cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) ->
> cameraWithCube.getTs());
>
>
> On Thu, Jun 28, 2018 at 9:22 AM ashish pok  wrote:
>
>> Fabian, All,
>>
>> Along this same line, we have a datasource where we have parent key and
>> child key. We need to first keyBy parent and then by child. If we want to
>> have physical partitioning in a way where physical partiotioning happens
>> first by parent key and localize grouping by child key, is there a need to
>> using custom partitioner? Obviously we can keyBy twice but was wondering if
>> we can minimize the re-partition stress.
>>
>> Thanks,
>>
>> Ashish
>>
>>
>> - Ashish
>>
>> On Thursday, June 28, 2018, 9:02 AM, Fabian Hueske 
>> wrote:
>>
>> Hi Vijay,
>>
>> Flink does not provide fine-grained control to place keys to certain
>> slots or machines.
>> When specifying a key, it is up to Flink (i.e., its internal hash
>> function) where the data is processed. This works well for large key
>> spaces, but can be difficult if you have only a few keys.
>>
>> So, even if you keyBy(cam) and handle the parallelization of seq#
>> internally (which I would not recommend), it might still happen that the
>> data of two cameras is processed on the same slot.
>> The only way to change that would be to fiddle with the hash of your
>> keys, but this might give you a completely different distribution when
>> scaling out the application at a later point in time.
>>
>> Best, Fabian
>>
>> 2018-06-26 19:54 GMT+02:00 Vijay Balakrishnan :
>>
>> Hi Fabian,
>> Thanks once again for your reply. I need to get the data from each
>> cam/camera into 1 partition/slot and not move the gigantic video data
>> around as much as I perform other operations on it. For eg, I can get seq#1
>> and seq#2 for cam1 in cam1 partition/slot and then combine, split,parse,
>> stitch etc. operations on it in multiple threads within the same cam1
>> partition.
>>
>> I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in
>> 1 partition(eg: cam1). The idea is to then work within the cam1 partition
>> with various seq#'s 1,2 etc on various threads within the same
>> slot/partition of TaskManager.
>>
>> The data is stored in EFS keyed based on seq#/cam# folder structure.
>>
>> Our actual problem is managing network bandwidth as a resource in each
>> partition. We want to make sure that the processing of 1 camera(split into
>> multiple seq# tasks) is not running on the same node as the processing of
>> another camera as in that case, the required network bandwidth for storing
>> the output of the process running in the partition would exceed the network
>> bandwidth of the hardware. Camera processing is expected to run on the same
>> hardware as the video decode step which is an earlier sequential process in
>> the same Dataflow pipeline.
>>
>> I guess I might have to use a ThreadPool within each Slot(cam partition)
>> to work on each seq# ??
>>
>> TIA
>>
>> On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske  wrote:
>>
>> Hi,
>>
>> keyBy() does not work 

run time error java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09

2018-07-02 Thread Mich Talebzadeh
Hi,

I created a jar file with sbt with this sbt file

 cat md_streaming.sbt
name := "md_streaming"
version := "1.0"
scalaVersion := "2.11.8"

libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.3"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
"1.5.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"

and the Scala code is very basic

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object md_streaming
{
  def main(args: Array[String])
  {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "rhes75:9092")
properties.setProperty("zookeeper.connect", "rhes75:2181")
properties.setProperty("group.id", "md_streaming")
val stream = env
 .addSource(new FlinkKafkaConsumer09[String]("md", new
SimpleStringSchema(), properties))
 .writeAsText("/tmp/md_streaming.txt")
env.execute("Flink Kafka Example")
  }
}

It compiles OK as follows

Compiling md_streaming
[info] Set current project to md_streaming (in build
file:/home/hduser/dba/bin/flink/md_streaming/)
[success] Total time: 0 s, completed Jul 2, 2018 10:16:05 AM
[info] Set current project to md_streaming (in build
file:/home/hduser/dba/bin/flink/md_streaming/)
[info] Updating
{file:/home/hduser/dba/bin/flink/md_streaming/}md_streaming...
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[warn] Scala version was updated by one of library dependencies:
[warn]  * org.scala-lang:scala-library:(2.11.8, 2.11.11, 2.11.6, 2.11.7) ->
2.11.12
[warn] To force scalaVersion, add the following:
[warn]  ivyScala := ivyScala.value map { _.copy(overrideScalaVersion =
true) }
[warn] There may be incompatibilities among your library dependencies.
[warn] Here are some of the libraries that were evicted:
[warn]  * org.apache.kafka:kafka_2.11:0.8.2.2 -> 1.1.0
[warn] Run 'evicted' to see detailed eviction warnings
[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[warn] there was one deprecation warning; re-run with -deprecation for
details
[warn] one warning found
[info] Packaging
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/md_streaming_2.11-1.0.jar
...
[info] Done packaging.
[success] Total time: 3 s, completed Jul 2, 2018 10:16:10 AM
Completed compiling

The content of jar file is as follows
jar tvf
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/md_streaming_2.11-1.0.jar
   277 Mon Jul 02 10:16:10 BST 2018 META-INF/MANIFEST.MF
  2003 Mon Jul 02 10:16:10 BST 2018 md_streaming$.class
   599 Mon Jul 02 10:16:10 BST 2018 md_streaming.class

When I run it with flink run I get this error

flink run
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/md_streaming_2.11-1.0.jar
Starting execution of program
java.lang.NoClassDefFoundError:
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09
at md_streaming$.main(md_streaming.scala:22)
at md_streaming.main(md_streaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
  

Dynamic Rule Evaluation in Flink

2018-07-02 Thread Aarti Gupta
 Hi,

We are currently evaluating Flink to build a real time rule engine that
looks at events in a stream and evaluates them against a set of rules.

The rules are dynamically configured and can be of three types -
1. Simple Conditions - these require you to look inside a single event.
Example, match rule if A happens.
2. Aggregations - these require you to aggregate multiple events. Example,
match rule if more than five A's happen.
3. Complex patterns - these require you to look at multiple events and
detect patterns. Example, match rule if A happens and then B happens.

Since the rules are dynamically configured, we cannot use CEP.

As an alternative, we are using connected streams and the CoFlatMap
function to store the rules in shared state, and evaluate each incoming
event against the stored rules.  Implementation is similar to what's
outlined here
.

My questions -

   1. Since the CoFlatMap function works on a single event, how do we
   evaluate rules that require aggregations across events. (Match rule if more
   than 5 A events happen)
   2. Since the CoFlatMap function works on a single event, how do we
   evaluate rules that require pattern detection across events (Match rule if
   A happens, followed by B).
   3. How do you dynamically define a window function.


--Aarti


-- 
Aarti Gupta 
Director, Engineering, Correlation


aagu...@qualys.com
T


Qualys, Inc. – Blog  | Community
 | Twitter 





The program didn't contain a Flink job

2018-07-02 Thread eSKa
Hello,We are currently running jobs on Flink 1.4.2. Our usecase is as
follows:
-service get request from customer
- we submit job to flink using YarnClusterClient
Sometimes we have up to 6 jobs at the same time. 

>From time to time we got error as below:
The program didn't contain a Flink job. 
org.apache.flink.client.program.ProgramMissingJobException: The program
didn't contain a Flink job. 
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:398)

>From logs we can see that main method from job is returning correct status,
but for some reason later Flink throws that exception anyway. Do you know
what could be a case here and how to prevent it from happening? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

[ANNOUNCE] Weekly community update #27

2018-07-02 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #27. Please post any news and
updates you want to share with the community to this thread.

# Feature freeze and release date for Flink 1.6

The community is currently discussing the feature freeze and, thus, also
the release date for Flink 1.6 [1] which will happen soon. Join the
discussion to learn more and voice your opinion.

# DynamoDB connector

The community currently discusses the possibility to extend the existing
FlinkKinesisConsumer to also consume date from DynamoDB [2]. If you have
ideas or experience with DynamoDB then please join the thread.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Feature-freeze-for-Flink-1-6-td23010.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Consuming-data-from-dynamoDB-streams-to-flink-td22963.html

Cheers,
Till


Re: Web history limit in flink 1.5

2018-07-02 Thread eSKa
thank you, I had to miss that option somehow :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-02 Thread Zhijiang(wangzhijiang999)
 Hi Gerard,

From the below stack, it can only indicate the task is canceled that may be 
triggered by job manager becuase of other task failure. If the task can not be 
interrupted within timeout config, the task managerprocess will be exited. Do 
you see any OutOfMemory messages from the task manager log?  Normally the ouput 
serialization buffer is managed by task manager framework and will not cause 
OOM, and on the input desearialization side, there will be a temp bytes array 
on each channel for holding partial records which is not managed by framework. 
I think you can confirm whether and where caused the OOM. Maybe check the task 
failure logs.

Zhijiang


--
发件人:gerardg 
发送时间:2018年6月30日(星期六) 00:12
收件人:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any 
kind of log error while other tasks running in the same task manager continue 
without problems. When these tasks are restarted the task manager gets killed 
and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) 
java.nio.ByteBuffer.wrap(ByteBuffer.java:396) 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 scala.collection.immutable.List.foreach(List.scala:392) 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 (...) 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)