??????kafka consumer exception

2019-02-19 Thread ForwardXu



   kafka 
consumerclient-id??flink??client-id??kafka??jira??
https://issues.apache.org/jira/browse/KAFKA-3992??client-id??kafka??clientid??"consumer;
 +  id??







--  --
??: "";
: 2019??2??20??(??) 3:02
??: "user-zh";

: kafka consumer exception



flink??flink on kafka(1.0) ??
??job??


[org.apache.kafka.common.utils.AppInfoParser] [AppInfoParser.java:60] - Error 
registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=consumer-31
  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
  at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
  at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
  at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
  at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
  at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
  at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  at java.lang.Thread.run(Thread.java:748)

flink sink kafka exception

2019-02-19 Thread 董鹏
flink大神,你们好。flink sink kafka 
遇到这个异常,不影响job运行,不影响结果,偶尔抛出。向你们请教一下,希望获取些思路。2019-02-20 10:08:46.889 +0800 
[Source: rn -> Flat Map -> async wait operator -> async wait operator -> Sink: 
Unnamed (17/20)] ERROR [org.apache.flink.streaming.runtime.tasks.StreamTask] 
[StreamTask.java:481] - Error during disposal of stream operator. 
org.apache.kafka.common.KafkaException: Failed to close kafka producer  at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)   
 at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)   
 at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)   
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) 
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.InterruptedException: nullat java.lang.Object.wait(Native 
Method) at java.lang.Thread.join(Thread.java:1260)  at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)   
 ... 9 common frames omitted

kafka consumer exception

2019-02-19 Thread 董鹏
flink大神你们好,在使用flink on kafka(1.0版本) 遇到如下异常:
不影响job,不影响结果,对于这个异常偶尔打出,你们是否有遇到这个问题呢?


[org.apache.kafka.common.utils.AppInfoParser] [AppInfoParser.java:60] - Error 
registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=consumer-31
  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
  at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
  at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
  at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
  at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
  at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
  at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  at java.lang.Thread.run(Thread.java:748)

Re: Reading messages from start - new job submission

2019-02-19 Thread Dian Fu
Hi Avi,

As described in the documentation: "If offsets could not be found for a 
partition, the auto.offset.reset setting in the properties will be used.". For 
starting from GroupOffset, the property "auto.offset.reset" will ONLY be 
respected when the group offset cannot be found for a partition. Otherwise, it 
will use the offset found.

Regards,
Dian

> 在 2019年2月20日,上午1:49,avilevi  写道:
> 
> Thanks for the answer,
> But my question is why do  I need to set
> /myConsumer.setStartFromEarliest();/  if I set this property
> /setProperty("auto.offset.reset", "earliest") /in consumer properties ? 
> I want the consumer to start reading from earliest only If offsets could not
> be found as stated in the documentation.
> isn't that the expected behaviour ?
> 
> Best wishes 
> Avi
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: EXT :Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Dian Fu
DataStream.assignTimestampsAndWatermarks will add a watermark generator 
operator after each source operator(if their parallelism is the same which is 
true for the code you showed) and so if one instance of the source operator has 
no data, the corresponding watermark generator operator cannot generate 
watermark.

Regards,
Dian


> 在 2019年2月20日,上午12:56,Stephen Connolly  写道:
> 
> Though I am explicitly assigning watermarks with 
> DataStream.assignTimestampsAndWatermarks and I see all the data flowing 
> through that... so shouldn't that override the watermarks from the original 
> source?
> 
> On Tue, 19 Feb 2019 at 15:59, Martin, Nick  > wrote:
> Yeah, that’s expected/known. Watermarks for the empty partition don’t 
> advance, so the window in your window function never closes.
> 
>  
> 
> There’s a ticket open to fix it 
> (https://issues.apache.org/jira/browse/FLINK-5479 
> ) for the kafka connector, 
> but in general any time one parallel instance of a source function isn’t 
> getting data you have to watch out for this.
> 
>  
> 
> From: Stephen Connolly [mailto:stephen.alan.conno...@gmail.com 
> ] 
> Sent: Tuesday, February 19, 2019 6:32 AM
> To: user mailto:user@flink.apache.org>>
> Subject: EXT :Re: How to debug difference between Kinesis and Kafka
> 
>  
> 
> Hmmm my suspicions are now quite high. I created a file source that just 
> replays the events straight then I get more results
> 
>  
> 
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly 
> mailto:stephen.alan.conno...@gmail.com>> 
> wrote:
> 
> Hmmm after expanding the dataset such that there was additional data that 
> ended up on shard-0 (everything in my original dataset was coincidentally 
> landing on shard-1) I am now getting output... should I expect this kind of 
> behaviour if no data arrives at shard-0 ever?
> 
>  
> 
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly 
> mailto:stephen.alan.conno...@gmail.com>> 
> wrote:
> 
> Hi, I’m having a strange situation and I would like to know where I should 
> start trying to debug.
> 
>  
> 
> I have set up a configurable swap in source, with three implementations:
> 
>  
> 
> 1. A mock implementation
> 
> 2. A Kafka consumer implementation
> 
> 3. A Kinesis consumer implementation
> 
>  
> 
> From injecting a log and no-op map function I can see that all three sources 
> pass through the events correctly.
> 
>  
> 
> I then have a window based on event time stamps… and from inspecting the 
> aggregation function I can see that the data is getting aggregated…, I’m 
> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I 
> can retrieve the key
> 
>  
> 
> Here’s the strange thing, I only change the source (and each source uses the 
> same deserialization function) but:
> 
>  
> 
> When I use either Kafka or my Mock source, the WindowFunction gets called as 
> events pass the end of the window
> When I use the Kinesis source, however, the window function never gets 
> called. I have even tried injecting events into kinesis with really high 
> timestamps to flush the watermarks in my 
> BoundedOutOfOrdernessTimestampExtractor... but nothing
> I cannot see how this source switching could result in such a different 
> behaviour:
> 
>  
> 
> Properties sourceProperties = new Properties();
> 
> ConsumerFactory sourceFactory;
> 
> String sourceName = configParams.getRequired("source");
> 
> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
> 
> case "kinesis":
> 
> sourceFactory = FlinkKinesisConsumer::new;
> 
> copyOptionalArg(configParams, "aws-region", sourceProperties, 
> AWSConfigConstants.AWS_REGION);
> 
> copyOptionalArg(configParams, "aws-endpoint", 
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
> 
> copyOptionalArg(configParams, "aws-access-key", 
> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
> 
> copyOptionalArg(configParams, "aws-secret-key", 
> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
> 
> copyOptionalArg(configParams, "aws-profile", 
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
> 
> break;
> 
> case "kafka":
> 
> sourceFactory = FlinkKafkaConsumer010::new;
> 
> copyRequiredArg(configParams, "bootstrap-server", 
> sourceProperties, "bootstrap.servers");
> 
> copyOptionalArg(configParams, "group-id", sourceProperties, 
> "group.id ");
> 
> break;
> 
> case "mock":
> 
> sourceFactory = MockSourceFunction::new;
> 
> break;
> 
> default:
> 
> throw new RuntimeException("Unknown source '" + sourceName + 
> '\'');
> 
> }
> 
>  
> 
> // set up the streaming 

How to use my custom log4j.properties when running minicluster in idea

2019-02-19 Thread peibin wang
Hi, 

   I am running flink job in the Intellij IDEA  with mini cluster (not 
submit it to the flink cluster ) for  convenience . 

Now I have put my custom log config file ( both log4j.properties and 
logback.xml)  in src/main/resources/. But it does not work. Is there any 
solutions?



Re: Starting Flink cluster and running a job

2019-02-19 Thread Boris Lublinsky
Thanks Ken,
That was my first instinct as well, but..
To run on the cluster I am building an uber jar for which I am fixing Kafka 
clients jar version
I am also fixing version of Kafka
So I do not know where another version can get from


Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Feb 19, 2019, at 7:02 PM, Ken Krugler  wrote:
> 
> Hi Boris,
> 
> I haven’t seen this exact error, but I have seen similar errors caused by 
> multiple versions of jars on the classpath.
> 
> When I’ve run into this particular "XXX is not an instance of YYY" problem, 
> it often seems to be caused by a jar that I should have marked as provided in 
> my pom.
> 
> Though I’m typically running on a YARN cluster, not w/K8s, so maybe this 
> doesn’t apply.
> 
> — Ken
> 
> PS - I assume you’ve been reading 
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>  
> 
> 
> 
>> On Feb 19, 2019, at 4:34 PM, Boris Lublinsky > > wrote:
>> 
>> Konstantin,
>> After experimenting with this for a while, I got to the root cause of the 
>> problem
>> I am running a version of a Taxi ride travel prediction as my sample.
>> It works fine in Intellij,
>> But when I am trying to put it in the docker (standard Debian 1.7 image)
>> It fails with a following error
>> 
>> 
>> The program finished with the following exception:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
>> (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
>>  at 
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>>  at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>>  at 
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>  at 
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>  at 
>> com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
>>  at 
>> com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>  at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>  at 
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
>> execution failed.
>>  at 
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>  at 
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>>  ... 19 more
>> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
>> producer
>>  at 
>> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:416)
>>  at 
>> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:116)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>>  at 
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
>>  at 
>> 

Re: Starting Flink cluster and running a job

2019-02-19 Thread Ken Krugler
Hi Boris,

I haven’t seen this exact error, but I have seen similar errors caused by 
multiple versions of jars on the classpath.

When I’ve run into this particular "XXX is not an instance of YYY" problem, it 
often seems to be caused by a jar that I should have marked as provided in my 
pom.

Though I’m typically running on a YARN cluster, not w/K8s, so maybe this 
doesn’t apply.

— Ken

PS - I assume you’ve been reading 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
 



> On Feb 19, 2019, at 4:34 PM, Boris Lublinsky  
> wrote:
> 
> Konstantin,
> After experimenting with this for a while, I got to the root cause of the 
> problem
> I am running a version of a Taxi ride travel prediction as my sample.
> It works fine in Intellij,
> But when I am trying to put it in the docker (standard Debian 1.7 image)
> It fails with a following error
> 
> 
> The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>   at 
> com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
>   at 
> com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 19 more
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> producer
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:416)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:116)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>  

Re: Jira issue Flink-11127

2019-02-19 Thread Boris Lublinsky
Thanks Konstantin

Unfortunately it does not work

The snippet from task manager yaml is

containers:
- name: taskmanager
  image: {{ .Values.image }}:{{ .Values.imageTag }}
  imagePullPolicy: {{ .Values.imagePullPolicy }}
  args:
  - taskmanager -Dtaskmanager.host=$(K8S_POD_IP)
  ports:
  - name: data
containerPort: 6121
  - name: rpc
containerPort: 6122
  - name: query
containerPort: 6125
  env:
  - name: FLINK_CONF_DIR
value: /etc/flink
  - name: K8S_POD_IP
valueFrom:
  fieldRef:
fieldPath: status.podIP
  resources:



The error is
/docker-entrypoint.sh: 62: exec: taskmanager -Dtaskmanager.host=10.131.0.97: 
not found


Did I misunderstood your instructions?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Feb 19, 2019, at 4:33 AM, Konstantin Knauf  
> wrote:
> 
> Hi Boris, 
> 
> the solution is actually simpler than it sounds from the ticket. The only 
> thing you need to do is to set the "taskmanager.host" to the Pod's IP address 
> in the Flink configuration. The easiest way to do this is to pass this config 
> dynamically via a command-line parameter. 
> 
> The Deployment spec could looks something like this:
> containers:
> - name: taskmanager
>   [...]
>   args:
>   - "taskmanager.sh"
>   - "start-foreground"
>   - "-Dtaskmanager.host=$(K8S_POD_IP)"
>   [...]
>   env:
>   - name: K8S_POD_IP
> valueFrom:
>   fieldRef:
> fieldPath: status.podIP
> 
> Hope this helps and let me know if this works. 
> 
> Best, 
> 
> Konstantin
> 
> On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> I was looking at this issue https://issues.apache.org/jira/browse/FLINK-11127 
> 
> Apparently there is a workaround for it.
> Is it possible provide the complete helm chart for it.
> Bits and pieces are in the ticket, but it would be nice to see the full chart
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/ 
> 
> 
> -- 
> Konstantin Knauf | Solutions Architect
> +49 160 91394525
> 
>  
> Follow us @VervericaData
> --
> Join Flink Forward  - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



Re: Starting Flink cluster and running a job

2019-02-19 Thread Boris Lublinsky
Konstantin,
After experimenting with this for a while, I got to the root cause of the 
problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
9340e7669e7344ab827fef4ddb5ba73d)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at 
com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at 
com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
producer
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:416)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:116)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of 
org.apache.kafka.common.serialization.Serializer
at 

Assigning timestamps and watermarks several times, several datastreams?

2019-02-19 Thread Aakarsh Madhavan
Hi!

Currently I am using Flink 1.4.2.

class TSWM implements AssignerWithPunctuatedWatermarks {
   long maxTS = Long.MIN_VALUE;
   @Override
  public Watermark checkAndGetNextWatermark(POJO event, long l) {
maxTS = Math.max(maxTS, event.TS);
return new Watermark(getMaxTimestamp());
  }

  @Override
  public long extractTimestamp(POJO event, long l) {
maxTS = Math.max(maxTS, event.TS);
return event.TS;
  }
}

DataStream ds1 = ... .assignTimestampsAndWatermarks(new TSWM())

DataStream ds2 = ... .assignTimestampsAndWatermarks(new TSWM())
Suppose I ran this code above, what I am confused about is the overall
watermarking system.

Now I want to do the following:

ds1.keyBy("id").window(SlidingEventTimeWindows.of(Time.minutes(1),
Time.seconds(30))).trigger(EventTimeTrigger.create()).apply(someApplyFunction);

ds2.keyBy("id").window(SlidingEventTimeWindows.of(Time.minutes(1),
Time.seconds(30))).trigger(EventTimeTrigger.create()).apply(someApplyFunction);

The main doubt I am having is how this works with the watermarks. Does
`ds1` and `ds2` have separate watermarks that don't concern each other? Ie
do they operate separately?

I am just not sure how the window trigger would work for example or how the
watermarks would advance. Do they watermarks reset and advance for each
stream separately so no data is lost?

Thanks!


Metrics for number of "open windows"?

2019-02-19 Thread Andrew Roberts
Hello,

I’m trying to track the number of currently-in-state windows in a keyed, 
windowed stream (stream.keyBy(…).window(…).trigger(…).process(…)) using Flink 
metrics. Are there any built in? Or any good approaches for collecting this 
data?

Thanks,

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


Re: Reading messages from start - new job submission

2019-02-19 Thread avilevi
Thanks for the answer,
But my question is why do  I need to set
/myConsumer.setStartFromEarliest();/  if I set this property
/setProperty("auto.offset.reset", "earliest") /in consumer properties ? 
I want the consumer to start reading from earliest only If offsets could not
be found as stated in the documentation.
isn't that the expected behaviour ?

Best wishes 
Avi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: EXT :Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Stephen Connolly
Though I am explicitly assigning watermarks with
DataStream.assignTimestampsAndWatermarks and I see all the data flowing
through that... so shouldn't that override the watermarks from the original
source?

On Tue, 19 Feb 2019 at 15:59, Martin, Nick  wrote:

> Yeah, that’s expected/known. Watermarks for the empty partition don’t
> advance, so the window in your window function never closes.
>
>
>
> There’s a ticket open to fix it (
> https://issues.apache.org/jira/browse/FLINK-5479) for the kafka
> connector, but in general any time one parallel instance of a source
> function isn’t getting data you have to watch out for this.
>
>
>
> *From:* Stephen Connolly [mailto:stephen.alan.conno...@gmail.com]
> *Sent:* Tuesday, February 19, 2019 6:32 AM
> *To:* user 
> *Subject:* EXT :Re: How to debug difference between Kinesis and Kafka
>
>
>
> Hmmm my suspicions are now quite high. I created a file source that just
> replays the events straight then I get more results
>
>
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
> Hmmm after expanding the dataset such that there was additional data that
> ended up on shard-0 (everything in my original dataset was coincidentally
> landing on shard-1) I am now getting output... should I expect this kind of
> behaviour if no data arrives at shard-0 ever?
>
>
>
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
> Hi, I’m having a strange situation and I would like to know where I should
> start trying to debug.
>
>
>
> I have set up a configurable swap in source, with three implementations:
>
>
>
> 1. A mock implementation
>
> 2. A Kafka consumer implementation
>
> 3. A Kinesis consumer implementation
>
>
>
> From injecting a log and no-op map function I can see that all three
> sources pass through the events correctly.
>
>
>
> I then have a window based on event time stamps… and from inspecting the
> aggregation function I can see that the data is getting aggregated…, I’m
> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
> can retrieve the key
>
>
>
> Here’s the strange thing, I only change the source (and each source uses
> the same deserialization function) but:
>
>
>
>- When I use either Kafka or my Mock source, the WindowFunction gets
>called as events pass the end of the window
>- When I use the Kinesis source, however, the window function never
>gets called. I have even tried injecting events into kinesis with really
>high timestamps to flush the watermarks in my
>BoundedOutOfOrdernessTimestampExtractor... but nothing
>
> I cannot see how this source switching could result in such a different
> behaviour:
>
>
>
> Properties sourceProperties = new Properties();
>
> ConsumerFactory sourceFactory;
>
> String sourceName = configParams.getRequired("source");
>
> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>
> case "kinesis":
>
> sourceFactory = FlinkKinesisConsumer::new;
>
> copyOptionalArg(configParams, "aws-region",
> sourceProperties, AWSConfigConstants.AWS_REGION);
>
> copyOptionalArg(configParams, "aws-endpoint",
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>
> copyOptionalArg(configParams, "aws-access-key",
> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>
> copyOptionalArg(configParams, "aws-secret-key",
> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>
> copyOptionalArg(configParams, "aws-profile",
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>
> break;
>
> case "kafka":
>
> sourceFactory = FlinkKafkaConsumer010::new;
>
> copyRequiredArg(configParams, "bootstrap-server",
> sourceProperties, "bootstrap.servers");
>
> copyOptionalArg(configParams, "group-id",
> sourceProperties, "group.id");
>
> break;
>
> case "mock":
>
> sourceFactory = MockSourceFunction::new;
>
> break;
>
> default:
>
> throw new RuntimeException("Unknown source '" + sourceName
> + '\'');
>
> }
>
>
>
> // set up the streaming execution environment
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
> // poll watermark every second because using
> BoundedOutOfOrdernessTimestampExtractor
>
> env.getConfig().setAutoWatermarkInterval(1000L);
>
> env.enableCheckpointing(5000);
>
>
>
> SplitStream eventsByType =
> env.addSource(sourceFactory.create(
>
> configParams.getRequired("topic"),
>
> new ObjectNodeDeserializationSchema(),
>
> sourceProperties
>
> ))
>
> .returns(ObjectNode.class) // the use of ConsumerFactory
> erases the 

Re: FLIP-16, FLIP-15 Status Updates?

2019-02-19 Thread John Tipper
Hi Timo,

That’s great, thank you very much. If I’d like to contribute, is it best to 
wait until the roadmap has been published? And is this the best list to ask on, 
or is the development mailing list better?

Many thanks,

John

Sent from my iPhone

> On 19 Feb 2019, at 16:29, Timo Walther  wrote:
> 
> Hi John,
> 
> you are right that there was not much progress in the last years around these 
> two FLIPs. Mostly due to shift of priorities. However, with the big Blink 
> code contribution from Alibaba and joint development forces for a unified 
> batch and streaming runtime [1], it is very likely that also iterations and 
> thus machine learning algorithms will see more development efforts.
> 
> The community is working on roadmap page for the website. And I can already 
> reveal that a new iterations model is mentioned there. The new Flink roadmap 
> page can be expected in the next 2-3 weeks.
> 
> I hope this information helps.
> 
> Regards,
> Timo
> 
> [1] 
> https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html
> 
>> Am 19.02.19 um 12:47 schrieb John Tipper:
>> Hi All,
>> 
>> Does anyone know what the current status is for FLIP-16 (loop fault 
>> tolerance) and FLIP-15 (redesign iterations) please? I can see lots of work 
>> back in 2016, but it all seemed to stop and go quiet since about March 2017. 
>> I see iterations as offering very interesting capabilities for Flink, so it 
>> would be good to understand how we can get this moving again.
>> 
>> Many thanks,
>> 
>> John
>> 
>> Sent from my iPhone
> 
> 


Re: FLIP-16, FLIP-15 Status Updates?

2019-02-19 Thread Timo Walther

Hi John,

you are right that there was not much progress in the last years around 
these two FLIPs. Mostly due to shift of priorities. However, with the 
big Blink code contribution from Alibaba and joint development forces 
for a unified batch and streaming runtime [1], it is very likely that 
also iterations and thus machine learning algorithms will see more 
development efforts.


The community is working on roadmap page for the website. And I can 
already reveal that a new iterations model is mentioned there. The new 
Flink roadmap page can be expected in the next 2-3 weeks.


I hope this information helps.

Regards,
Timo

[1] 
https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html


Am 19.02.19 um 12:47 schrieb John Tipper:

Hi All,

Does anyone know what the current status is for FLIP-16 (loop fault tolerance) 
and FLIP-15 (redesign iterations) please? I can see lots of work back in 2016, 
but it all seemed to stop and go quiet since about March 2017. I see iterations 
as offering very interesting capabilities for Flink, so it would be good to 
understand how we can get this moving again.

Many thanks,

John

Sent from my iPhone





Re: Dataset statistics

2019-02-19 Thread Flavio Pompermaier
We've just published a first attempt (on Flink 1.6.2) that extract some
descriptive statistics from a batch dataset[1].
Any feedback is welcome.

Best,
Flavio

[1] https://github.com/okkam-it/flink-descriptive-stats

On Thu, Feb 14, 2019 at 11:19 AM Flavio Pompermaier 
wrote:

> No effort in this direction, then?
> I had a try using SQL on Table API but I fear that the generated plan is
> not the optimal one..I'm looking for an efficient way to implement
> describe() method on a table or dataset/datasource
>
> On Fri, Feb 8, 2019 at 10:35 AM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> is there any effort to standardize descriptive statistics in Apache Flink?
>> Is there any suggested way to achieve this?
>>
>> Best,
>> Flavio
>>
>
>

-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


RE: EXT :Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Martin, Nick
Yeah, that’s expected/known. Watermarks for the empty partition don’t advance, 
so the window in your window function never closes.

There’s a ticket open to fix it 
(https://issues.apache.org/jira/browse/FLINK-5479) for the kafka connector, but 
in general any time one parallel instance of a source function isn’t getting 
data you have to watch out for this.

From: Stephen Connolly [mailto:stephen.alan.conno...@gmail.com]
Sent: Tuesday, February 19, 2019 6:32 AM
To: user 
Subject: EXT :Re: How to debug difference between Kinesis and Kafka

Hmmm my suspicions are now quite high. I created a file source that just 
replays the events straight then I get more results

On Tue, 19 Feb 2019 at 11:50, Stephen Connolly 
mailto:stephen.alan.conno...@gmail.com>> wrote:
Hmmm after expanding the dataset such that there was additional data that ended 
up on shard-0 (everything in my original dataset was coincidentally landing on 
shard-1) I am now getting output... should I expect this kind of behaviour if 
no data arrives at shard-0 ever?

On Tue, 19 Feb 2019 at 11:14, Stephen Connolly 
mailto:stephen.alan.conno...@gmail.com>> wrote:
Hi, I’m having a strange situation and I would like to know where I should 
start trying to debug.

I have set up a configurable swap in source, with three implementations:

1. A mock implementation
2. A Kafka consumer implementation
3. A Kinesis consumer implementation

From injecting a log and no-op map function I can see that all three sources 
pass through the events correctly.

I then have a window based on event time stamps… and from inspecting the 
aggregation function I can see that the data is getting aggregated…, I’m using 
the `.aggregate(AggregateFunction.WindowFunction)` variant so that I can 
retrieve the key

Here’s the strange thing, I only change the source (and each source uses the 
same deserialization function) but:


  *   When I use either Kafka or my Mock source, the WindowFunction gets called 
as events pass the end of the window
  *   When I use the Kinesis source, however, the window function never gets 
called. I have even tried injecting events into kinesis with really high 
timestamps to flush the watermarks in my 
BoundedOutOfOrdernessTimestampExtractor... but nothing
I cannot see how this source switching could result in such a different 
behaviour:

Properties sourceProperties = new Properties();
ConsumerFactory sourceFactory;
String sourceName = configParams.getRequired("source");
switch (sourceName.toLowerCase(Locale.ENGLISH)) {
case "kinesis":
sourceFactory = FlinkKinesisConsumer::new;
copyOptionalArg(configParams, "aws-region", sourceProperties, 
AWSConfigConstants.AWS_REGION);
copyOptionalArg(configParams, "aws-endpoint", sourceProperties, 
AWSConfigConstants.AWS_ENDPOINT);
copyOptionalArg(configParams, "aws-access-key", 
sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
copyOptionalArg(configParams, "aws-secret-key", 
sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
copyOptionalArg(configParams, "aws-profile", sourceProperties, 
AWSConfigConstants.AWS_PROFILE_NAME);
break;
case "kafka":
sourceFactory = FlinkKafkaConsumer010::new;
copyRequiredArg(configParams, "bootstrap-server", 
sourceProperties, "bootstrap.servers");
copyOptionalArg(configParams, "group-id", sourceProperties, 
"group.id");
break;
case "mock":
sourceFactory = MockSourceFunction::new;
break;
default:
throw new RuntimeException("Unknown source '" + sourceName + 
'\'');
}

// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// poll watermark every second because using 
BoundedOutOfOrdernessTimestampExtractor
env.getConfig().setAutoWatermarkInterval(1000L);
env.enableCheckpointing(5000);

SplitStream eventsByType = env.addSource(sourceFactory.create(
configParams.getRequired("topic"),
new ObjectNodeDeserializationSchema(),
sourceProperties
))
.returns(ObjectNode.class) // the use of ConsumerFactory erases 
the type info so add it back
.name("raw-events")
.assignTimestampsAndWatermarks(
new 
ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", Time.seconds(5))
)
.split(new JsonNodeOutputSelector("eventType"));
...
eventsByType.select(...)
.keyBy(new JsonNodeStringKeySelector("_key"))

.window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),

Re: Get nested Rows from Json string

2019-02-19 Thread françois lacombe
Hi Rong,

Thank you for JIRA.
Understood it may be solved in a next release, I'll comment the ticket in
case of further input

All the best

François

Le sam. 9 févr. 2019 à 00:57, Rong Rong  a écrit :

> Hi François,
>
> I just did some research and seems like this is in fact a Stringify issue.
> If you try running one of the AvroRowDeSerializationSchemaTest [1],
> you will find out that only MAP, ARRAY are correctly stringify (Map using
> "{}" quote and Array using "[]" quote).
> However nested records are not quoted using "()".
>
> Wasn't sure if this is consider as a bug for the toString method of the
> type Row. I just filed a JIRA [2] for this issue, feel free to comment on
> the discussion.
>
> --
> Rong
>
> [1]
> https://github.com/apache/flink/blob/release-1.7/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
> [2] https://issues.apache.org/jira/browse/FLINK-11569
>
> On Fri, Feb 8, 2019 at 8:51 AM françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi Rong,
>>
>> Thank you for this answer.
>> I've changed Rows to Map, which ease the conversion process.
>>
>> Nevertheless I'm interested in any explanation about why row1.setField(i,
>> row2) appeends row2 at the end of row1.
>>
>> All the best
>>
>> François
>>
>> Le mer. 6 févr. 2019 à 19:33, Rong Rong  a écrit :
>>
>>> Hi François,
>>>
>>> I wasn't exactly sure this is a JSON object or JSON string you are
>>> trying to process.
>>> For a JSON string this [1] article might help.
>>> For a JSON object, I am assuming you are trying to convert it into a
>>> TableSource and processing using Table/SQL API, you could probably use the
>>> example here [2]
>>>
>>> BTW, a very remote hunch, this might be just a stringify issue how you
>>> print the row out.
>>>
>>> --
>>> Rong
>>>
>>> [1]:
>>> https://stackoverflow.com/questions/49380778/how-to-stream-a-json-using-flink
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sourceSinks.html#table-sources-sinks
>>>
>>> On Wed, Feb 6, 2019 at 3:06 AM françois lacombe <
>>> francois.laco...@dcbrain.com> wrote:
>>>
 Hi all,

 I currently get a json string from my pgsql source with nested objects
 to be converted into Flink's Row.
 Nested json objects should go in nested Rows.
 An avro schema rules the structure my source should conform to.

 According to this json :
 {
   "a":"b",
   "c":"d",
   "e":{
"f":"g"
}
 }

 ("b", "d", Row("g")) is expected as a result according to my avro
 schema.

 I wrote a recursive method which iterate over json objects and put
 nested Rows at right indices in their parent but here is what outputs :
 ("b", "d", "g")
 Child Row is appended to the parent. I don't understand why.
 Obviously, process is crashing arguing the top level Row arity doesn't
 match serializers.

 Is there some native methods in Flink to achieve that?
 I don't feel so comfortable to have written my own json processor for
 this job.

 Do you have any hint which can help please ?

 All the best

 François



    

 

 [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
 nécessaire

>>>
>>
>>    
>> 
>> 
>>
>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>> nécessaire
>>
>

-- 

       
   



 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: How to load multiple same-format files with single batch job?

2019-02-19 Thread françois lacombe
Hi Fabian,

After a bit more documentation reading I have a better understanding of how
InputFormat interface works.
Indeed I've better to wrap a custom InputFormat implementation in my source.
This article helps a lot
https://brewing.codes/2017/02/06/implementing-flink-batch-data-connector/

connect() will be for a next sprint

All the best

François

Le ven. 15 févr. 2019 à 09:37, Fabian Hueske  a écrit :

> H François,
>
> The TableEnvironment.connect() method can only be used if you provide
> (quite a bit) more code.
> It requires a TableSourceFactory and handling of all the properties that
> are defined in the other builder methods. See [1].
>
> I would recommend to either register the BatchTableSource directly
> (tEnv.registerTableSource()) or get a DataSet (via env.createSource()) and
> register the DataSet as a Table (tEnv.registerDataSet()).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sourceSinks.html#define-a-tablefactory
>
>
> Am Mo., 11. Feb. 2019 um 21:09 Uhr schrieb françois lacombe <
> francois.laco...@dcbrain.com>:
>
>> Hi Fabian,
>>
>> I've got issues for a custom InputFormat implementation with my existing
>> code.
>>
>> Is this can be used in combination with a BatchTableSource custom source?
>> As I understand your solution, I should move my source to implementations
>> like :
>>
>> tableEnvironment
>>   .connect(...)
>>   .withFormat(...)
>>   .withSchema(...)
>>   .inAppendMode()
>>   .registerTableSource("MyTable")
>>
>> right?
>>
>> I currently have a BatchTableSource class which produce a DataSet
>> from a single geojson file.
>> This doesn't sound compatible with a custom InputFormat, don't you?
>>
>> Thanks in advance for any addition hint, all the best
>>
>> François
>>
>> Le lun. 4 févr. 2019 à 12:10, Fabian Hueske  a écrit :
>>
>>> Hi,
>>>
>>> The files will be read in a streaming fashion.
>>> Typically files are broken down into processing splits that are
>>> distributed to tasks for reading.
>>> How a task reads a file split depends on the implementation, but usually
>>> the format reads the split as a stream and does not read the split as a
>>> whole before emitting records.
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Mo., 4. Feb. 2019 um 12:06 Uhr schrieb françois lacombe <
>>> francois.laco...@dcbrain.com>:
>>>
 Hi Fabian,

 Thank you for this input.
 This is interesting.

 With such an input format, will all the file will be loaded in memory
 before to be processed or will all be streamed?

 All the best
 François

 Le mar. 29 janv. 2019 à 22:20, Fabian Hueske  a
 écrit :

> Hi,
>
> You can point a file-based input format to a directory and the input
> format should read all files in that directory.
> That works as well for TableSources that are internally use file-based
> input formats.
> Is that what you are looking for?
>
> Best, Fabian
>
> Am Mo., 28. Jan. 2019 um 17:22 Uhr schrieb françois lacombe <
> francois.laco...@dcbrain.com>:
>
>> Hi all,
>>
>> I'm wondering if it's possible and what's the best way to achieve the
>> loading of multiple files with a Json source to a JDBC sink ?
>> I'm running Flink 1.7.0
>>
>> Let's say I have about 1500 files with the same structure (same
>> format, schema, everything) and I want to load them with a *batch* job
>> Can Flink handle the loading of one and each file in a single source
>> and send data to my JDBC sink?
>> I wish I can provide the URL of the directory containing my thousand
>> files to the batch source to make it load all of them sequentially.
>> My sources and sinks are currently available for BatchTableSource, I
>> guess the cost to make them available for streaming would be quite
>> expensive for me for the moment.
>>
>> Have someone ever done this?
>> Am I wrong to expect doing so with a batch job?
>>
>> All the best
>>
>> François Lacombe
>>
>>
>> 
>> 
>> 
>> 
>>
>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que
>> si nécessaire
>>
>

    

 

 [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
 nécessaire

>>>
>>
>>    
>> 
>> 
>>
>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>> nécessaire
>>
>

-- 

    

Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Stephen Connolly
Hmmm my suspicions are now quite high. I created a file source that just
replays the events straight then I get more results

On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Hmmm after expanding the dataset such that there was additional data that
> ended up on shard-0 (everything in my original dataset was coincidentally
> landing on shard-1) I am now getting output... should I expect this kind of
> behaviour if no data arrives at shard-0 ever?
>
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> Hi, I’m having a strange situation and I would like to know where I
>> should start trying to debug.
>>
>> I have set up a configurable swap in source, with three implementations:
>>
>> 1. A mock implementation
>> 2. A Kafka consumer implementation
>> 3. A Kinesis consumer implementation
>>
>> From injecting a log and no-op map function I can see that all three
>> sources pass through the events correctly.
>>
>> I then have a window based on event time stamps… and from inspecting the
>> aggregation function I can see that the data is getting aggregated…, I’m
>> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
>> can retrieve the key
>>
>> Here’s the strange thing, I only change the source (and each source uses
>> the same deserialization function) but:
>>
>>
>>- When I use either Kafka or my Mock source, the WindowFunction gets
>>called as events pass the end of the window
>>- When I use the Kinesis source, however, the window function never
>>gets called. I have even tried injecting events into kinesis with really
>>high timestamps to flush the watermarks in my
>>BoundedOutOfOrdernessTimestampExtractor... but nothing
>>
>> I cannot see how this source switching could result in such a different
>> behaviour:
>>
>> Properties sourceProperties = new Properties();
>> ConsumerFactory sourceFactory;
>> String sourceName = configParams.getRequired("source");
>> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>> case "kinesis":
>> sourceFactory = FlinkKinesisConsumer::new;
>> copyOptionalArg(configParams, "aws-region",
>> sourceProperties, AWSConfigConstants.AWS_REGION);
>> copyOptionalArg(configParams, "aws-endpoint",
>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>> copyOptionalArg(configParams, "aws-access-key",
>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>> copyOptionalArg(configParams, "aws-secret-key",
>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>> copyOptionalArg(configParams, "aws-profile",
>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>> break;
>> case "kafka":
>> sourceFactory = FlinkKafkaConsumer010::new;
>> copyRequiredArg(configParams, "bootstrap-server",
>> sourceProperties, "bootstrap.servers");
>> copyOptionalArg(configParams, "group-id",
>> sourceProperties, "group.id");
>> break;
>> case "mock":
>> sourceFactory = MockSourceFunction::new;
>> break;
>> default:
>> throw new RuntimeException("Unknown source '" +
>> sourceName + '\'');
>> }
>>
>> // set up the streaming execution environment
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> // poll watermark every second because using
>> BoundedOutOfOrdernessTimestampExtractor
>> env.getConfig().setAutoWatermarkInterval(1000L);
>> env.enableCheckpointing(5000);
>>
>> SplitStream eventsByType =
>> env.addSource(sourceFactory.create(
>> configParams.getRequired("topic"),
>> new ObjectNodeDeserializationSchema(),
>> sourceProperties
>> ))
>> .returns(ObjectNode.class) // the use of ConsumerFactory
>> erases the type info so add it back
>> .name("raw-events")
>> .assignTimestampsAndWatermarks(
>> new
>> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
>> Time.seconds(5))
>> )
>> .split(new JsonNodeOutputSelector("eventType"));
>> ...
>> eventsByType.select(...)
>> .keyBy(new JsonNodeStringKeySelector("_key"))
>>
>> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
>> (KeySelector)
>> TasksMain::offsetPerMaster))
>> .trigger(EventTimeTrigger.create())
>> .aggregate(new CountsAggregator<>(), new KeyTagger<>())
>> // < The CountsAggregator is seeing the data
>> .print() // < HERE is where we get no output from
>> Kinesis... but Kafka and my Mock are 

Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Stephen Connolly
Hmmm after expanding the dataset such that there was additional data that
ended up on shard-0 (everything in my original dataset was coincidentally
landing on shard-1) I am now getting output... should I expect this kind of
behaviour if no data arrives at shard-0 ever?

On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Hi, I’m having a strange situation and I would like to know where I should
> start trying to debug.
>
> I have set up a configurable swap in source, with three implementations:
>
> 1. A mock implementation
> 2. A Kafka consumer implementation
> 3. A Kinesis consumer implementation
>
> From injecting a log and no-op map function I can see that all three
> sources pass through the events correctly.
>
> I then have a window based on event time stamps… and from inspecting the
> aggregation function I can see that the data is getting aggregated…, I’m
> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
> can retrieve the key
>
> Here’s the strange thing, I only change the source (and each source uses
> the same deserialization function) but:
>
>
>- When I use either Kafka or my Mock source, the WindowFunction gets
>called as events pass the end of the window
>- When I use the Kinesis source, however, the window function never
>gets called. I have even tried injecting events into kinesis with really
>high timestamps to flush the watermarks in my
>BoundedOutOfOrdernessTimestampExtractor... but nothing
>
> I cannot see how this source switching could result in such a different
> behaviour:
>
> Properties sourceProperties = new Properties();
> ConsumerFactory sourceFactory;
> String sourceName = configParams.getRequired("source");
> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
> case "kinesis":
> sourceFactory = FlinkKinesisConsumer::new;
> copyOptionalArg(configParams, "aws-region",
> sourceProperties, AWSConfigConstants.AWS_REGION);
> copyOptionalArg(configParams, "aws-endpoint",
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
> copyOptionalArg(configParams, "aws-access-key",
> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
> copyOptionalArg(configParams, "aws-secret-key",
> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
> copyOptionalArg(configParams, "aws-profile",
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
> break;
> case "kafka":
> sourceFactory = FlinkKafkaConsumer010::new;
> copyRequiredArg(configParams, "bootstrap-server",
> sourceProperties, "bootstrap.servers");
> copyOptionalArg(configParams, "group-id",
> sourceProperties, "group.id");
> break;
> case "mock":
> sourceFactory = MockSourceFunction::new;
> break;
> default:
> throw new RuntimeException("Unknown source '" + sourceName
> + '\'');
> }
>
> // set up the streaming execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> // poll watermark every second because using
> BoundedOutOfOrdernessTimestampExtractor
> env.getConfig().setAutoWatermarkInterval(1000L);
> env.enableCheckpointing(5000);
>
> SplitStream eventsByType =
> env.addSource(sourceFactory.create(
> configParams.getRequired("topic"),
> new ObjectNodeDeserializationSchema(),
> sourceProperties
> ))
> .returns(ObjectNode.class) // the use of ConsumerFactory
> erases the type info so add it back
> .name("raw-events")
> .assignTimestampsAndWatermarks(
> new
> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
> Time.seconds(5))
> )
> .split(new JsonNodeOutputSelector("eventType"));
> ...
> eventsByType.select(...)
> .keyBy(new JsonNodeStringKeySelector("_key"))
>
> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
> (KeySelector)
> TasksMain::offsetPerMaster))
> .trigger(EventTimeTrigger.create())
> .aggregate(new CountsAggregator<>(), new KeyTagger<>()) //
> < The CountsAggregator is seeing the data
> .print() // < HERE is where we get no output from
> Kinesis... but Kafka and my Mock are just fine!
>
>
>


FLIP-16, FLIP-15 Status Updates?

2019-02-19 Thread John Tipper
Hi All,

Does anyone know what the current status is for FLIP-16 (loop fault tolerance) 
and FLIP-15 (redesign iterations) please? I can see lots of work back in 2016, 
but it all seemed to stop and go quiet since about March 2017. I see iterations 
as offering very interesting capabilities for Flink, so it would be good to 
understand how we can get this moving again.

Many thanks,

John

Sent from my iPhone

How to debug difference between Kinesis and Kafka

2019-02-19 Thread Stephen Connolly
Hi, I’m having a strange situation and I would like to know where I should
start trying to debug.

I have set up a configurable swap in source, with three implementations:

1. A mock implementation
2. A Kafka consumer implementation
3. A Kinesis consumer implementation

>From injecting a log and no-op map function I can see that all three
sources pass through the events correctly.

I then have a window based on event time stamps… and from inspecting the
aggregation function I can see that the data is getting aggregated…, I’m
using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
can retrieve the key

Here’s the strange thing, I only change the source (and each source uses
the same deserialization function) but:


   - When I use either Kafka or my Mock source, the WindowFunction gets
   called as events pass the end of the window
   - When I use the Kinesis source, however, the window function never gets
   called. I have even tried injecting events into kinesis with really high
   timestamps to flush the watermarks in my
   BoundedOutOfOrdernessTimestampExtractor... but nothing

I cannot see how this source switching could result in such a different
behaviour:

Properties sourceProperties = new Properties();
ConsumerFactory sourceFactory;
String sourceName = configParams.getRequired("source");
switch (sourceName.toLowerCase(Locale.ENGLISH)) {
case "kinesis":
sourceFactory = FlinkKinesisConsumer::new;
copyOptionalArg(configParams, "aws-region",
sourceProperties, AWSConfigConstants.AWS_REGION);
copyOptionalArg(configParams, "aws-endpoint",
sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
copyOptionalArg(configParams, "aws-access-key",
sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
copyOptionalArg(configParams, "aws-secret-key",
sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
copyOptionalArg(configParams, "aws-profile",
sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
break;
case "kafka":
sourceFactory = FlinkKafkaConsumer010::new;
copyRequiredArg(configParams, "bootstrap-server",
sourceProperties, "bootstrap.servers");
copyOptionalArg(configParams, "group-id", sourceProperties,
"group.id");
break;
case "mock":
sourceFactory = MockSourceFunction::new;
break;
default:
throw new RuntimeException("Unknown source '" + sourceName
+ '\'');
}

// set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

// poll watermark every second because using
BoundedOutOfOrdernessTimestampExtractor
env.getConfig().setAutoWatermarkInterval(1000L);
env.enableCheckpointing(5000);

SplitStream eventsByType =
env.addSource(sourceFactory.create(
configParams.getRequired("topic"),
new ObjectNodeDeserializationSchema(),
sourceProperties
))
.returns(ObjectNode.class) // the use of ConsumerFactory
erases the type info so add it back
.name("raw-events")
.assignTimestampsAndWatermarks(
new
ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
Time.seconds(5))
)
.split(new JsonNodeOutputSelector("eventType"));
...
eventsByType.select(...)
.keyBy(new JsonNodeStringKeySelector("_key"))

.window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
(KeySelector)
TasksMain::offsetPerMaster))
.trigger(EventTimeTrigger.create())
.aggregate(new CountsAggregator<>(), new KeyTagger<>()) //
< The CountsAggregator is seeing the data
.print() // < HERE is where we get no output from
Kinesis... but Kafka and my Mock are just fine!


Re: Starting Flink cluster and running a job

2019-02-19 Thread Konstantin Knauf
Hi Boris,

without looking at the entrypoint in much detail, generally there should
not be a race condition there:

* if the taskmanagers can not connect to the resourcemanager they will
retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager
it will also wait for the resources/slots to provided. The timeout there is
also 5 minutes, I think.

So, this should actually be pretty robust as long as the Taskmanager
containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers,

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Following
> https://github.com/apache/flink/tree/release-1.7/flink-container/docker
> I have created an entry point, which looks like follows:
>
> #!/bin/sh
>
> 
> #   from 
> https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
> #   and 
> https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
> 
>
> # If unspecified, the hostname of the container is taken as the JobManager 
> address
> JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
>
> drop_privs_cmd() {
> if [ $(id -u) != 0 ]; then
> # Don't need to drop privs if EUID != 0
> return
> elif [ -x /sbin/su-exec ]; then
> # Alpine
> echo su-exec flink
> else
> # Others
> echo gosu flink
> fi
> }
>
> JOB_MANAGER="jobmanager"
> TASK_MANAGER="taskmanager"
>
> CMD="$1"
> shift
>
> if [ "${CMD}" = "help" ]; then
> echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
> exit 0
> elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
> if [ "${CMD}" = "${TASK_MANAGER}" ]; then
> 
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep 
> -c ^processor /proc/cpuinfo)}
>
> sed -i -e "s/jobmanager.rpc.address: 
> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" 
> "$FLINK_HOME/conf/flink-conf.yaml"
> sed -i -e "s/taskmanager.numberOfTaskSlots: 
> 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" 
> "$FLINK_HOME/conf/flink-conf.yaml"
> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
> echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
>
> echo "Starting Task Manager"
> echo "config file: " && grep '^[^\n#]' 
> "$FLINK_HOME/conf/flink-conf.yaml"
> exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" 
> start-foreground
> else
> sed -i -e "s/jobmanager.rpc.address: 
> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" 
> "$FLINK_HOME/conf/flink-conf.yaml"
> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
> echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
> echo "config file: " && grep '^[^\n#]' 
> "$FLINK_HOME/conf/flink-conf.yaml"
>
> if [ -z "$1" ]; then
>exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" 
> start-foreground "$@"
> else
> exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
> fi
> fi
> fi
>
> exec "$@"
>
> It does work for all the cases, except running standalone job.
> The problem, the way I understand it, is a racing condition.
> In kubernetes it takes several attempts for establish connection between
> Job and Task manager, while standalone-job.sh
>  tries to start a job immediately once the cluster is created (before
> connection is established).
> Is there a better option to implement it starting a job on container
> startup?
>
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Jira issue Flink-11127

2019-02-19 Thread Konstantin Knauf
Hi Boris,

the solution is actually simpler than it sounds from the ticket. The only
thing you need to do is to set the "taskmanager.host" to the Pod's IP
address in the Flink configuration. The easiest way to do this is to pass
this config dynamically via a command-line parameter.

The Deployment spec could looks something like this:

containers:
- name: taskmanager
  [...]
  args:
  - "taskmanager.sh"
  - "start-foreground"
  - "-Dtaskmanager.host=$(K8S_POD_IP)"
  [...]

  env:
  - name: K8S_POD_IP
valueFrom:
  fieldRef:
fieldPath: status.podIP


Hope this helps and let me know if this works.

Best,

Konstantin

On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> I was looking at this issue
> https://issues.apache.org/jira/browse/FLINK-11127
> Apparently there is a workaround for it.
> Is it possible provide the complete helm chart for it.
> Bits and pieces are in the ticket, but it would be nice to see the full
> chart
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Stream enrichment with static data, side inputs for DataStream

2019-02-19 Thread Artur Mrozowski
Hi,
I have a stream of buildings and each building has foreign key reference to
municipality. Municipalities data set is quite static. Both are placed on
Kafka topics. I want to enrich each building with municipality name.

FLIP 17, proposal would be ideal for this use case but it's still just a
proposal
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
.

My question is how do you work around issues like this?

Best regards
Artur


Re: subscribe

2019-02-19 Thread Artur Mrozowski
Will do, thanks!

On Tue, Feb 19, 2019 at 8:57 AM Fabian Hueske  wrote:

> Hi Artur,
>
> In order to subscribe to Flink's user mailing list you need to send a mail
> to user-subscr...@flink.apache.org
>
> Best, Fabian
>
> Am Mo., 18. Feb. 2019 um 20:34 Uhr schrieb Artur Mrozowski <
> art...@gmail.com>:
>
>> art...@gmail.com
>>
>