Re: Something wrong with the until condition FLINK-CEP

2018-12-10 Thread bupt_ljy
Sorry, it seems that I misunderstood the concept of the composition of the 
until condition and oneOrMore.


Original Message
Sender:bupt_ljybupt_...@163.com
Recipient:useru...@flink.apache.org
Date:Tuesday, Dec 11, 2018 14:00
Subject:Something wrong with the until condition FLINK-CEP


Hi all,
 I seem to find a problem of until condition in 
testGreedyUntilZeroOrMoreWithDummyEventsAfterQuantifier in GreedyITCase.java. I 
modify the unit test a little bit like this:


@Test
 public void testGreedyUntilZeroOrMoreWithDummyEventsAfterQuantifier() throws 
Exception {
 ListStreamRecordEvent inputEvents = new ArrayList();

 Event c = new Event(40, "c", 1.0);
 Event a1 = new Event(41, "a", 2.0);
 Event a2 = new Event(42, "d", 2.0);

 inputEvents.add(new StreamRecord(c, 1));
 inputEvents.add(new StreamRecord(a1, 2));
 inputEvents.add(new StreamRecord(a2, 3));

 // c a* d
 PatternEvent, ? pattern = Pattern.Eventbegin("start").where(new 
SimpleConditionEvent() {
 private static final long serialVersionUID = 5726188262756267490L;

 @Override
 public boolean filter(Event value) throws Exception {
 return value.getName().equals("c");
 }
 }).followedBy("middle").where(new SimpleConditionEvent() {
 private static final long serialVersionUID = 5726188262756267490L;

 @Override
 public boolean filter(Event value) throws Exception {
 return value.getName().equals("a");
 }
 }).oneOrMore().greedy().until(new SimpleConditionEvent() {
 private static final long serialVersionUID = 5726188262756267490L;

 @Override
 public boolean filter(Event value) throws Exception {
 return value.getPrice()  3.0;
 }
 }).followedBy("end").where(new SimpleConditionEvent() {
 private static final long serialVersionUID = 5726188262756267490L;

 @Override
 public boolean filter(Event value) throws Exception {
 return value.getName().equals("d");
 }
 });

 NFAEvent nfa = compile(pattern, false);

 final ListListEvent resultingPatterns = feedNFA(inputEvents, nfa);

 compareMaps(resultingPatterns, Lists.ListEventnewArrayList(
 Lists.newArrayList(c, a1, a2)
 ));
 }


I think this should fail because no events satisfies the until condition, but 
it works.
Flink version is 1.6.


Best,
Jiayi Liao

Something wrong with the until condition FLINK-CEP

2018-12-10 Thread bupt_ljy
Hi all,
 I seem to find a problem of until condition in 
testGreedyUntilZeroOrMoreWithDummyEventsAfterQuantifier in GreedyITCase.java. I 
modify the unit test a little bit like this:


@Test
 public void testGreedyUntilZeroOrMoreWithDummyEventsAfterQuantifier() throws 
Exception {
 ListStreamRecordEvent inputEvents = new ArrayList();

 Event c = new Event(40, "c", 1.0);
 Event a1 = new Event(41, "a", 2.0);
 Event a2 = new Event(42, "d", 2.0);

 inputEvents.add(new StreamRecord(c, 1));
 inputEvents.add(new StreamRecord(a1, 2));
 inputEvents.add(new StreamRecord(a2, 3));

 // c a* d
 PatternEvent, ? pattern = Pattern.Eventbegin("start").where(new 
SimpleConditionEvent() {
 private static final long serialVersionUID = 5726188262756267490L;

 @Override
 public boolean filter(Event value) throws Exception {
 return value.getName().equals("c");
 }
 }).followedBy("middle").where(new SimpleConditionEvent() {
 private static final long serialVersionUID = 5726188262756267490L;

 @Override
 public boolean filter(Event value) throws Exception {
 return value.getName().equals("a");
 }
 }).oneOrMore().greedy().until(new SimpleConditionEvent() {
 private static final long serialVersionUID = 5726188262756267490L;

 @Override
 public boolean filter(Event value) throws Exception {
 return value.getPrice()  3.0;
 }
 }).followedBy("end").where(new SimpleConditionEvent() {
 private static final long serialVersionUID = 5726188262756267490L;

 @Override
 public boolean filter(Event value) throws Exception {
 return value.getName().equals("d");
 }
 });

 NFAEvent nfa = compile(pattern, false);

 final ListListEvent resultingPatterns = feedNFA(inputEvents, nfa);

 compareMaps(resultingPatterns, Lists.ListEventnewArrayList(
 Lists.newArrayList(c, a1, a2)
 ));
 }


I think this should fail because no events satisfies the until condition, but 
it works.
Flink version is 1.6.


Best,
Jiayi Liao

After job cancel, leftover ZK state prevents job manager startup

2018-12-10 Thread Micah Wylde
Hello,

We've been seeing an issue with several Flink 1.5.4 clusters that looks
like this:

1. Job is cancelled with a savepoint
2. The jar is deleted from our HA blobstore (S3)
3. The jobgraph in ZK is *not* deleted
4. We restart the cluster
5. Startup fails in recovery because the jar is not available, with the
stacktrace:

00:13:58.486 ERROR o.a.f.r.e.ClusterEntrypoint - Fatal error occurred in
the cluster entrypoint.
{{ java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set up
JobManager}}
{{ at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)}}
{{ at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)}}
{{ at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)}}
{{ at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)}}
{{ at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
{{ at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
{{ at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
{{ at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}Caused
by: java.lang.Exception: Cannot set up the user code libraries: No such
file or directory:
s3://streamingplatform-production/{JOB_NAME}/flink/highavailability/{JOB_NAME}/blob/job_5a3fe2c00c05efd3a552a1c6707d2c10/blob_p-6d585831f5c947335ac505b400cf8f3630cc706a-42355c2885b668b0bc5e15b856141b0

This superficially seems similar to several issues that have apparently
been fixed in 1.5.4, like FLINK-10255 and FLINK-10184.

Has anybody else seen this issue on 1.5.4 (or later) clusters? Or any
advice for debugging?

Thanks,
Micah


Re: Error while reading from hadoop sequence file

2018-12-10 Thread Akshay Mendole
Could anyone please help me with this?
Thanks,
Akshay

On Mon, 10 Dec 2018, 6:05 pm Akshay Mendole  Hi,
>I have been facing issues while trying to read from a hdfs sequence
> file.
>
> This is my code snippet
>
> DataSource> input = env
> .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, 
> ravenDataDir),
> TypeInformation.of(new TypeHint>() {
> }));
>
>
> Upon executing this in yarn cluster mode, I am getting following error
> The type returned by the input format could not be automatically
> determined. Please specify the TypeInformation of the produced type
> explicitly by using the 'createInput(InputFormat, TypeInformation)' method
> instead.
>
> org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551)
> flipkart.EnrichementFlink.main(EnrichementFlink.java:31)
>
>
> When I add the TypeInformation myself as follows, I run into the same
> issue.
>
> DataSource> input = env
> .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, 
> ravenDataDir));
>
>
>
>
> When I add these libraries in the lib folder,
> flink-hadoop-compatibility_2.11-1.7.0.jar
>
>
> the error changes to this
>
> java.lang.NoClassDefFoundError:
> org/apache/flink/api/common/typeutils/TypeSerializerSnapshot
> at
> org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111)
> at
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
> at
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
> at
> org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283)
> at
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252)
> at
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97)
> at
> org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399)
> at
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379)
> at
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>
>
> Can someone help me resolve this issue?
>
> Thanks,
> Akshay
>
>
>
>


Re: runtime.resourcemanager

2018-12-10 Thread Alieh













Hello,

this is the task manage log but it does not change after I run the 
program.  I think the Flink planner has problem with my program. It can 
not even start the job.


Best,

Alieh


018-12-10 12:20:20,386 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 

2018-12-10 12:20:20,387 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Starting 
TaskManager (Version: 1.6.0, Rev:ff472b4, Date:07.08.2018 @ 13:31:13 UTC)
2018-12-10 12:20:20,387 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  OS current 
user: alieh
2018-12-10 12:20:20,609 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2018-12-10 12:20:20,768 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Current 
Hadoop/Kerberos user: alieh
2018-12-10 12:20:20,769 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM: Java 
HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.161-b12
2018-12-10 12:20:20,769 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Maximum heap 
size: 922 MiBytes
2018-12-10 12:20:20,769 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JAVA_HOME: 
/usr/lib/jvm/java-8-oracle
2018-12-10 12:20:20,774 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Hadoop 
version: 2.4.1
2018-12-10 12:20:20,775 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM Options:
2018-12-10 12:20:20,775 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -XX:+UseG1GC
2018-12-10 12:20:20,775 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xms922M
2018-12-10 12:20:20,775 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xmx922M
2018-12-10 12:20:20,775 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
-XX:MaxDirectMemorySize=8388607T
2018-12-10 12:20:20,775 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
-Dlog.file=/home/alieh/flink-1.6.0/log/flink-alieh-taskexecutor-0-alieh-P67A-D3-B3.log
2018-12-10 12:20:20,775 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
-Dlog4j.configuration=file:/home/alieh/flink-1.6.0/conf/log4j.properties
2018-12-10 12:20:20,775 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
-Dlogback.configurationFile=file:/home/alieh/flink-1.6.0/conf/logback.xml
2018-12-10 12:20:20,775 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Program 
Arguments:
2018-12-10 12:20:20,776 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - --configDir
2018-12-10 12:20:20,776 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
/home/alieh/flink-1.6.0/conf
2018-12-10 12:20:20,776 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Classpath: 
/home/alieh/flink-1.6.0/lib/flink-python_2.11-1.6.0.jar:/home/alieh/flink-1.6.0/lib/flink-shaded-hadoop2-uber-1.6.0.jar:/home/alieh/flink-1.6.0/lib/log4j-1.2.17.jar:/home/alieh/flink-1.6.0/lib/slf4j-log4j12-1.7.7.jar:/home/alieh/flink-1.6.0/lib/flink-dist_2.11-1.6.0.jar:::
2018-12-10 12:20:20,776 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 

2018-12-10 12:20:20,777 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Registered UNIX 
signal handlers for [TERM, HUP, INT]
2018-12-10 12:20:20,785 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Maximum number 
of open file descriptors is 1048576.
2018-12-10 12:20:20,803 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, localhost
2018-12-10 12:20:20,803 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2018-12-10 12:20:20,803 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.size, 1024m
2018-12-10 12:20:20,803 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.size, 1024m
2018-12-10 12:20:20,803 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2018-12-10 12:20:20,803 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: parallelism.default, 1
2018-12-10 12:20:20,804 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: rest.port, 8081

Re: Question regarding rescale api

2018-12-10 Thread Till Rohrmann
Hi Mingliang,

Aljoscha is right. At the moment Flink does not support to spread out tasks
across all TaskManagers. This is a feature which we still need to add.
Until then, you need to set the parallelism to the number of available
slots in order to guarantee that all TaskManagers are equally used.

Cheers,
Till

On Mon, Dec 10, 2018 at 3:18 PM Aljoscha Krettek 
wrote:

> Hi,
>
> I think with how currently the assignment of tasks to slots works there is
> no way of ensuring that the source tasks are evenly spread to the
> TaskManagers (TaskExecutors). The rescale() API is from a time where
> scheduling worked a bit different in Flink, I'm afraid.
>
> I'm cc'ing Till, who might know more about scheduling.
>
> Best,
> Aljoscha
>
>
> On 10. Dec 2018, at 13:02, 祁明良  wrote:
>
>
> Hi Aljoscha,
>
> Seems you are the committer of rescale api, any help about this question?
>
> Best,
> Mingliang
>
> --
> *发件人:* 祁明良
> *发送时间:* 2018年12月9日 18:20
> *收件人:* user@flink.apache.org
> *主题:* Question regarding rescale api
>
> Hi All,
>
> I see the rescale api allow us to somehow redistribute element locally,
> but is it possible to make the upstream operator distributed evenly on task
> managers?
> For example I have 10 task managers each with 10 slots. The application
> reads data from Kafka topic with 20 partitions, then rescale it to full
> parallelism. To me it seems that the 20 slots needed to read from Kafka
> won’t distributed evenly on 10 task managers, which means further rescale
> still needs to shuffle data over network.
>
>
> Best,
> Mingliang
>
>
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>
> This communication may contain privileged or other confidential
> information of Red. If you have received it in error, please advise the
> sender by reply e-mail and immediately delete the message and any
> attachments without copying or disclosing the contents. Thank you.
>
>
>


Re: Flink issue while setting up in Kubernetes

2018-12-10 Thread Gary Yao
Hi Abhi Thakur,

We need more information to help you. What docker images are you using? Can
you share the kubernetes resource definitions? Can you share the complete
logs
of the JM and TMs? Did you follow the steps outlined in the Flink
documentation [1]?

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html

On Mon, Dec 10, 2018 at 7:29 AM Thakur, Abhi  wrote:

> We are trying to setup a single node Kubernetes cluster.
>
> 1 Job Manager and 1 Task Manager.
>
> Before we were getting an error, and we followed this thread.
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-4-issues-w-TaskManager-connecting-to-ResourceManager-td23298.html
>
> After following the above mentioned archive , we have used the following
> commands to startup the Flink services :
>
>
>
> ${FLINK_HOME}/bin/jobmanager.sh start-foreground
>
> ${FLINK_HOME}/bin/taskmanager.sh start-foreground
>
>
>
> Previously jobmanager was being started as :
>
> ${FLINK_HOME}/bin/jobmanager.sh start-foreground  *cluster*
>
> ${FLINK_HOME}/bin/taskmanager.sh start-foreground
>
>
>
> It removed that error and now we are getting this error as shown below.
>
> We searched all  archives and have a dead end.
>
> We have set up all ports correctly. Flink version used is 1.6.2.
>
> Thanks in advance.
>
>
>
>
>
> 2018-12-08 06:52:38,959 WARN akka.remote.ReliableDeliverySupervisor -
> Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Disassociated]
>
>   2018-12-08 06:52:41,863 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager -
> Registering TaskManager with ResourceID 037d1c33ec0406598f2ce30472f97e65
> (akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122/user/taskmanager_0)
> at ResourceManager
>
>   2018-12-08 06:53:23,619 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - The
> heartbeat of TaskManager with id e0383ee248832f639659082c70a2f4e9 timed
> out.
>
>   2018-12-08 06:53:23,619 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager -
> Closing TaskExecutor connection e0383ee248832f639659082c70a2f4e9 because:
> The heartbeat of TaskManager with id e0383ee248832f639659082c70a2f4e9 timed
> out.
>
>   2018-12-08 06:53:48,961 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Disassociated]
>
>   2018-12-08 06:53:53,615 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by:
> [flink-taskmanager-6cf55db87b-5x9sd: Name or service not known]
>
>   2018-12-08 06:54:03,601 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by:
> [flink-taskmanager-6cf55db87b-5x9sd]
>
>   2018-12-08 06:54:13,605 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by:
> [flink-taskmanager-6cf55db87b-5x9sd: Name or service not known]
>
>   2018-12-08 06:54:23,613 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by:
> [flink-taskmanager-6cf55db87b-5x9sd: Name or service not known]
>
>   2018-12-08 06:54:33,601 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by:
> [flink-taskmanager-6cf55db87b-5x9sd]
>
>   2018-12-08 06:54:33,619 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - The
> heartbeat of TaskManager with id 037d1c33ec0406598f2ce30472f97e65 timed
> out.
>
>   2018-12-08 06:54:33,619 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager -
> Closing TaskExecutor connection 037d1c33ec0406598f2ce30472f97e65 because:
> The heartbeat of TaskManager with id 

Re: Question regarding rescale api

2018-12-10 Thread Aljoscha Krettek
Hi,

I think with how currently the assignment of tasks to slots works there is no 
way of ensuring that the source tasks are evenly spread to the TaskManagers 
(TaskExecutors). The rescale() API is from a time where scheduling worked a bit 
different in Flink, I'm afraid.

I'm cc'ing Till, who might know more about scheduling.

Best,
Aljoscha


> On 10. Dec 2018, at 13:02, 祁明良  wrote:
> 
> 
> Hi Aljoscha,
> 
> Seems you are the committer of rescale api, any help about this question?
> 
> Best,
> Mingliang
> 
> 发件人: 祁明良
> 发送时间: 2018年12月9日 18:20
> 收件人: user@flink.apache.org
> 主题: Question regarding rescale api
>  
> Hi All,
> 
> I see the rescale api allow us to somehow redistribute element locally, but 
> is it possible to make the upstream operator distributed evenly on task 
> managers?
> For example I have 10 task managers each with 10 slots. The application reads 
> data from Kafka topic with 20 partitions, then rescale it to full 
> parallelism. To me it seems that the 20 slots needed to read from Kafka won’t 
> distributed evenly on 10 task managers, which means further rescale still 
> needs to shuffle data over network.
> 
> 
> Best,
> Mingliang
> 
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>  
> This communication may contain privileged or other confidential information 
> of Red. If you have received it in error, please advise the sender by reply 
> e-mail and immediately delete the message and any attachments without copying 
> or disclosing the contents. Thank you.



[ANNOUNCE] Weekly community update #50

2018-12-10 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #50. Please post any news and
updates you want to share with the community to this thread.

# Unified core API for streaming and batch

The community started to discuss how to bring streaming and batch closer
together by implementing a common Operator abstraction on which both stream
and batch operators can run [1]. The discussion is still in its early stage
but you should subscribe to this thread if you want to stay up to date.

# Flink backward compatibility

Thomas started a while ago a discussion about Flink's backwards
compatibility which should not only include its APIs because Flink is used
by more and more third party applications [2]. As Stephan and Chesnay
mentioned, backwards compatibility should also be guaranteed for the client
APIs and data structures (e.g. job specification).

# Enhance convenience of TableEnvironment in Table API/SQL

Jincheng started a discussion on how to improve the TableEnvironment usage
from a user's perspective. At the moment the existing inheritance structure
can be confusing to users. He, thus, proposes to change this structure to
have more meaningful names for the user [3].

# Creating Flink 1.5.6

The community discussed whether to release a last bug fix release 1.5.6 for
the 1.5.x release branch [4]. So far the unanimous feedback is positive and
in favour of creating a last 1.5.6 release.

# Usage of Flink's Python API

The community started a survey of the usage of Flink's Python APIs [5].
Please join this discussion if you want to tell how you are using Flink's
Python APIs and how it could be improved.

[1]
https://lists.apache.org/thread.html/2746759af3c92091bb743cfe028c90777f8011a064bb95e65b1fb951@%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/99059c90a0a1b59a4f18a5a0fdb73e17071b17bbb036649a48bb233b@%3Cdev.flink.apache.org%3E
[4]
https://lists.apache.org/thread.html/b740feb190fd63db3d15bfe0399097d905ea49fad83ce9ccf4c070cd@%3Cdev.flink.apache.org%3E
[5]
https://lists.apache.org/thread.html/348366080d6b87bf390efb98e5bf268620ab04a0451f8459e2f466cd@%3Cdev.flink.apache.org%3E

Cheers,
Till


Error while reading from hadoop sequence file

2018-12-10 Thread Akshay Mendole
Hi,
   I have been facing issues while trying to read from a hdfs sequence file.

This is my code snippet

DataSource> input = env
.createInput(HadoopInputs.readSequenceFile(Text.class, Text.class,
ravenDataDir),
TypeInformation.of(new TypeHint>() {
}));


Upon executing this in yarn cluster mode, I am getting following error
The type returned by the input format could not be automatically
determined. Please specify the TypeInformation of the produced type
explicitly by using the 'createInput(InputFormat, TypeInformation)' method
instead.
org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551)
flipkart.EnrichementFlink.main(EnrichementFlink.java:31)


When I add the TypeInformation myself as follows, I run into the same issue.

DataSource> input = env
.createInput(HadoopInputs.readSequenceFile(Text.class, Text.class,
ravenDataDir));




When I add these libraries in the lib folder,
flink-hadoop-compatibility_2.11-1.7.0.jar


the error changes to this

java.lang.NoClassDefFoundError:
org/apache/flink/api/common/typeutils/TypeSerializerSnapshot
at
org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97)
at
org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399)
at
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379)
at
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)


Can someone help me resolve this issue?

Thanks,
Akshay


Re: Flink Yarn Deployment Issue - 1.7.0

2018-12-10 Thread sohimankotia
can anyone pls help ?? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Yarn Deployment Issue - 1.7.0

2018-12-10 Thread sohimankotia
Anyone can help ?? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: runtime.resourcemanager

2018-12-10 Thread Piotr Nowojski
Hi,

Have you checked task managers logs?

Piotrek

> On 8 Dec 2018, at 12:23, Alieh  wrote:
> 
> Hello Piotrek,
> 
> thank you for your answer. I installed a Flink on a local cluster and used 
> the GUI in order to monitor the task managers. It seems the program does not 
> start at all. The whole time just the job manager is struggling... For very 
> very toy examples, after a long time (during this time I see the job manager 
> logs as I mentioned before),  the job is started and can be executed in 2 
> seconds.  
> 
> Best,
> 
> Alieh
> 
> 
> On 12/07/2018 10:43 AM, Piotr Nowojski wrote:
>> Hi,
>> 
>> Please investigate logs/standard output/error from the task manager that has 
>> failed (the logs that you showed are from job manager). Probably there is 
>> some obvious error/exception explaining why has it failed. Most common 
>> reasons:
>> - out of memory
>> - long GC pause
>> - seg fault or other error from some native library
>> - task manager killed via for example SIGKILL
>> 
>> Piotrek
>> 
>>> On 6 Dec 2018, at 17:34, Alieh  
>>>  wrote:
>>> 
>>> Hello all,
>>> 
>>> I have an algorithm x () which contains several joins and usage of 3 times 
>>> of gelly ConnectedComponents. The problem is that if I call x() inside a 
>>> script more than three times, I receive the messages listed below in the 
>>> log and the program is somehow stopped. It happens even if I run it with a 
>>> toy example of a graph with less that 10 vertices. Do you have any clue 
>>> what is the problem?
>>> 
>>> Cheers,
>>> 
>>> Alieh
>>> 
>>> 
>>> 129149 [flink-akka.actor.default-dispatcher-20] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
>>> Trigger heartbeat request.
>>> 129149 [flink-akka.actor.default-dispatcher-20] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
>>> Trigger heartbeat request.
>>> 129150 [flink-akka.actor.default-dispatcher-20] DEBUG 
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Received heartbeat 
>>> request from e80ec35f3d0a04a68000ecbdc555f98b.
>>> 129150 [flink-akka.actor.default-dispatcher-22] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
>>> Received heartbeat from 78cdd7a4-0c00-4912-992f-a2990a5d46db.
>>> 129151 [flink-akka.actor.default-dispatcher-22] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
>>> Received new slot report from TaskManager 
>>> 78cdd7a4-0c00-4912-992f-a2990a5d46db.
>>> 129151 [flink-akka.actor.default-dispatcher-22] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Received 
>>> slot report from instance 4c3e3654c11b09fbbf8e993a08a4c2da.
>>> 129200 [flink-akka.actor.default-dispatcher-15] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Release 
>>> TaskExecutor 4c3e3654c11b09fbbf8e993a08a4c2da because it exceeded the idle 
>>> timeout.
>>> 129200 [flink-akka.actor.default-dispatcher-15] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Worker 
>>> 78cdd7a4-0c00-4912-992f-a2990a5d46db could not be stopped.
>>> 
>> 
> 



Re: Trying to write to parquet file (kafka as a source) yields thousands of "in progress" files

2018-12-10 Thread Avi Levi
Got it , my bad. I should have used backeteer. this seems to be working fine
StreamingFileSink.forBulkFormat[Request](
new Path(outputPath),
ParquetAvroWriters.forReflectRecord(classOf[Request]))
.withBucketAssigner(DateTimeBucketAssigner[Request])
.withBucketCheckInterval(5000L)
.build()

On Sun, Dec 9, 2018 at 2:13 PM Avi Levi  wrote:

> Hi,
> I am trying to read from kafka and write to parquet. But I am getting
> thousands of ".part-0-0in progress..." files (and counting ...)
> is that a bug or am I doing something wrong?
>
> object StreamParquet extends App {
>   implicit val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>   env.enableCheckpointing(100)
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>   env.getCheckpointConfig.setCheckpointTimeout(600)
>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>   env.setParallelism(1)
> val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new
> AddressSchema(), consumerProperties)
>   val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
>   val outputPath = "streaming_files"
>   val sink = StreamingFileSink.forBulkFormat(
> new Path(outputPath),
> ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
>   stream.addSink(sink)
>   env.execute("Write to file")
> }
>
>


Re: Failed to resume job from checkpoint

2018-12-10 Thread Stefan Richter
Hi,

good that you found the cause of the problem in your configuration setting, but 
unfortunately I think I cannot yet follow your reasoning. Can you explain why 
the code would fail for a “slow” HDFS? If no local recovery is happening (this 
means: job failover, with local recovery activated)  the job will always first 
download all files from HDFS to your local disk. After that, it will hard link 
the file on local disk to another directory. I would assume that all HDFS 
problems like slowdowns will show in the part that is downloading the files to 
local disk. But your exceptions comes after that, when the file supposedly was 
already copied. So I don’t understand how you think that this is connected, can 
you please explain it in more detail? 

For your second question, Flink currently assumes that your HDFS (or whatever 
checkpoint filesystem you use) is stable, highly available storage and that 
files do not “get lost”. It can tolerate temporary outages through multiple 
restart attempts, but your setup of the checkpoint directory should prevent 
data loss.

Best,
Stefan 

> On 9. Dec 2018, at 14:05, Ben Yan  wrote:
> 
> hi,
> 
> 1. I took a closer look at the relevant code about 
> RocksDBIncrementalRestoreOperation::restoreInstanceDirectoryFromPath. And I 
> did some verification. I found this problem is likely related to file system 
> connection restrictions. At first I was worried that my hdfs would be 
> overloaded due to a large number of connections, so I configured the 
> following related parameters:
> 
> fs..limit.total: (number, 0/-1 mean no limit)
> fs..limit.input: (number, 0/-1 mean no limit)
> fs..limit.output: (number, 0/-1 mean no limit)
> fs..limit.timeout: (milliseconds, 0 means infinite)
> fs..limit.stream-timeout: (milliseconds, 0 means infinite)
> 
> Since I configured the above configuration, this problem has begun to appear! 
> When I removed the above configuration, the problem disappeared.I think that 
> when flink is configured with file system connection restrictions, the 
> mechanism for recovering from checkpoint needs to be improved. Jobs can 
> recover from checkpoints more slowly with file system connection 
> restrictions, rather than failing directly because of the above exceptions.
> 
> 2. After the job has been running for a long time, if the state data stored 
> in the state backend (such as hdfs) is lost for some reason, what other ways 
> can quickly restore this state data back quickly, for example, through some 
> kind of offline task is to quickly recover state data from offline data, so 
> that streaming jobs can be launched from this recovered state data.
> 
> Best
> Ben
> 
> Ben Yan mailto:yan.xiao.bin.m...@gmail.com>> 
> 于2018年12月8日周六 上午11:08写道:
> I hava already tested it.
> 
> [root@node ~]#ll 
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0038/
> total 32
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29 
> blobStore-273cf1a6-0f98-4c86-801e-5d76fef66a58
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29 
> blobStore-992562a5-f42f-43f7-90de-a415b4dcd398
> drwx--x---  4 yarn hadoop 4096 Dec  8 02:29 
> container_e73_1544101169829_0038_01_59
> drwx--x--- 13 yarn hadoop 4096 Dec  8 02:29 filecache
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29 
> flink-dist-cache-6d8dab0c-4034-4bbe-a9b9-b524cf6856e3
> drwxr-xr-x  8 yarn hadoop 4096 Dec  8 02:29 
> flink-io-6fba8471-4d84-4c13-9e3c-ef3891b366f0
> drwxr-xr-x  4 yarn hadoop 4096 Dec  8 02:29 localState
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29 
> rocksdb-lib-7ef4471db8d3b8c1bdcfa4dba4d95a36
> 
> And the derectory "flink-io-6fba8471-4d84-4c13-9e3c-ef3891b366f0" does not 
> exist.
> 
> [root@node ~]#ll 
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0038/
> total 12
> drwx--x--- 13 yarn hadoop 4096 Dec  8 02:29 filecache
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:53 localState
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:53 
> rocksdb-lib-7ef4471db8d3b8c1bdcfa4dba4d95a36
> 
> Ben Yan mailto:yan.xiao.bin.m...@gmail.com>> 
> 于2018年12月8日周六 上午12:23写道:
> Thank you for your advice! I will check this out next, and I will sync the 
> information at any time with new progress.
> 
> Stefan Richter  > 于2018年12月8日周六 上午12:05写道:
> I think then you need to investigate what goes wrong in 
> RocksDBIncrementalRestoreOperation::restoreInstanceDirectoryFromPath. If you 
> look at the code it lists the files in a directory and tries to hard link 
> them into another directory, and I would only expect to see the mentioned 
> exception if the original file that we try to link does not exist. However, 
> imo it must exist because we list it in the directory right before the link 
> attempt and Flink is not delete anything in the meantime. So the question is, 
> why can a file that was listed before just suddenly disappear when it is hard 
> linked? The only potential problem could be in the path transformations and 
> concatenations, but they look good to 

AW: number of files in checkpoint directory grows endlessly

2018-12-10 Thread Bernd.Winterstein
Hi Andrey,

I checked our code again. We are indeed using timers for dynamic routing 
updates. It gets triggered every five minutes!
This must be the reason for the five minutes pattern in the remaining .sst 
files.

Do I understand it correctly, that the files remain, because they are too small 
for compaction

A dynamic ttl column-family option has been introduced to solve this problem. 
Files (and, in turn, data) older than TTL will be scheduled for compaction when 
there is no other background work. This will make the data go through the 
regular compaction process and get rid of old unwanted data. This also has the 
(good) side-effect of all the data in the non-bottommost level being newer than 
ttl, and all data in the bottommost level older than ttl. Note that it could 
lead to more writes as RocksDB would schedule more compactions.

How do I enable this “ttl column-family” option? I don’t find any hint´s.

Regards,

Bernd

Von: Andrey Zagrebin [mailto:and...@data-artisans.com]
Gesendet: Donnerstag, 6. Dezember 2018 18:47
An: Winterstein, Bernd
Cc: myas...@live.com; k.klou...@data-artisans.com; user@flink.apache.org; 
s.rich...@data-artisans.com; t...@data-artisans.com; step...@data-artisans.com
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Does this directory contains approximately 300 files on task executor?

The 'transactionState' seems to be ok and compacted. It takes most of data 
size, up to 100Mb.

Do you use any Flink timers? They are also kept in RocksDb and take very 
little, just about 300Kb.

It might need more investigation but my idea is the following.
The column of Flink user timers has a lot of small files with just 1-2 new 
records, checkpointed all the time.  Most of the small file contents is just 
meta info. It seems to include both creation and deletion entries. As timers 
are always new entries, the sst files either do not overlap or too small. As 
result they are not merged upon compaction but just forwarded to the last level 
[1]. Setting dynamic compaction in SPINNING_DISK_OPTIMIZED_HIGH_MEM makes its 
target size what it is at any time, basically disabling compaction for it. The 
files can stay very long. I cannot check for all 300 files but it seems for 
19/26 file removal has not happened as it does for ‘transactionState'. The 
RocksDb suggest to use some other TTL concept to force compaction [2]. The 
compaction can be also triggered manually from time to time but this is not 
part of standard Flink distribution. The TtlDb cleans up expired entries also 
only during compaction.

Best,
Andrey

[1] https://github.com/facebook/rocksdb/blob/master/db/compaction.cc#L332
[2] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#ttl


On 6 Dec 2018, at 12:53, 
mailto:bernd.winterst...@dev.helaba.de>> 
mailto:bernd.winterst...@dev.helaba.de>> wrote:

Seems that some file deletion is disabled by default. There are some log 
entries in the file

Von: Andrey Zagrebin [mailto:and...@data-artisans.com]
Gesendet: Donnerstag, 6. Dezember 2018 12:07
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Thanks for sharing the code.

I understand your TTL requirement. It definitely makes sense for your 
application.
My recommendation is still to try running job with original backend without 
TtlDb modification to narrow down the problem and understand where these small 
files come from. They might look like some meta information which seems to 
accumulate over time.

Could you also investigate temporary local directories in task executors?
By default, they are usually in System.getProperty("java.io.tmpdir") (e.g. /tmp 
in Linux) or there is also a config option for them [1].
Do you see there explosion of similar files? They can be named differently. The 
interesting would be whether the smaller files end with ‘.sst’ or not.

Best,
Andrey

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#io-tmp-dirs



On 6 Dec 2018, at 09:16, 
mailto:bernd.winterst...@dev.helaba.de>> 
mailto:bernd.winterst...@dev.helaba.de>> wrote:

Hi Andrey
We need the TTL feature, because we store about 10 million state entries per 24 
hours. Each entry has to expire after 24 hours. We couldn’t do the expiry with 
timers due to bad checkpoint performance, so I switched to TtlDB.

It seems that per minute files are only available for the last incremental 
period. After this time the files have five minute distance.

As for the chk-* directories, there is only one directory left.

I attached the code for the options and the KeyedStateBackend and also one of 
the shared files.

Many thanks for your help.


Bernd

cat 
/appdata/rtts/ice/shared/jobmanager/checkpoints/stp/a73a132912c8efaaab4a8e6331bdcf47/shared/6d64a7cc-8cdf-4f51-aa2f-bb7c8e7d4ce3

¦>¦¦}d¦¦/20181205-HELADEF1TSK-TX-9ffXfg1DZCHEe2JGqpC6dDJ


Re: Flink - excessive amount of yarn container requests for versions > 1.4.0

2018-12-10 Thread sohi mankotia
Thanks for your information.

So there is no solution for this as of now ?

On Mon 10 Dec, 2018, 1:16 PM Shuyi Chen  We've seen similar issue in our production, you can refer to this JIRA (
> https://issues.apache.org/jira/browse/FLINK-10848) for more detail.
>
> Shuyi
>
> On Sun, Dec 9, 2018 at 11:27 PM sohimankotia 
> wrote:
>
>> Hi ,
>>
>> While running Flink streaming job it is requesting more than specified
>> resources from yarn. I am giving 17 TM but it is requesting more than > 35
>> containers from yarn .
>>
>> This is happening for all versions greater than 1.4.0.
>>
>>
>> Attaching JM logs.
>>
>> logs.zip
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/logs.zip>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> "So you have to trust that the dots will somehow connect in your future."
>