Re: deduplication with streaming sql

2018-02-06 Thread Timo Walther
and especially the comments there [1]. Regards, Timo [1] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala Am 2/6/18 um 1:36 PM schrieb Timo Walther: Hi Henri, I try to answer your

Re: deduplication with streaming sql

2018-02-06 Thread Timo Walther
Hi Henri, I try to answer your question: 1) You are right, SELECT DISTINCT should not need a retract stream. Internally, this is translated into an aggregation without an aggregate function call. So this definitely needs improvement. 2) The problem is that SELECT DISTINCT is not officially

Re: Will that be a problem if POJO key type didn't override equals?

2018-02-06 Thread Timo Walther
is? Since I'm not familiar with the details below the state implementations, it will be great if you can share more technical details or some references to me. Thank you! Best Regards, Tony Wei 2018-02-06 15:24 GMT+08:00 Timo Walther <twal...@apache.org <mailto:twal...@apache.org>&g

Re: Will that be a problem if POJO key type didn't override equals?

2018-02-05 Thread Timo Walther
Hi Tony, not having a proper equals() method might work for a keyBy() (partitioning operation) but it can lead to unexpected side effects when dealing with state. If not now, then maybe in the future. For example, heap-based state uses a hash table data structures such that your key might

Re: Maintain heavy hitters in Flink application

2018-02-01 Thread Timo Walther
Hi, I think it would be easier to implement a custom key selector and introduce some artifical key that spreads the load more evenly. This would also allow you to use keyed state. You could use a ProcessFunction and set timers to define the "every now and then". Keyed state would also ease

Re: How to access JobManager and TaskManager

2018-01-31 Thread Timo Walther
I don't have this property in my local running Flink cluster. Which Flink version and deployment are you using? Are you sure this property is not set in your flink-conf.yaml? Regards, Timo Am 1/31/18 um 7:51 AM schrieb xiatao123: In the web UI, I can see these information under JobManager.

Re: RichAsyncFunction in Scala

2018-01-31 Thread Timo Walther
Hi Wouter, you could use the Java classes as a workaround. If you take a look at the implementation [1], you will see that Scala only wraps the Java classes. I think you can implement the same. You can convert your result stream back into a Scala stream by calling `new

Re: Ingesting a table from a retraction stream !!

2018-01-26 Thread Timo Walther
Hi Puneet, I'm very sure that this feature will be included in upcoming releases. However, such an interface has to be carefully designed because built-in operators assume that only records that have been emitted previously are retracted. Maybe it will be part of Flink 1.6 but 1.7 should

Re: java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast to java.sql.Timestamp

2018-01-23 Thread Timo Walther
I am getting this issue while reading from the database using jdbc connector , can you guide me how to read from there as  a string or may mapped to another type while reading from the database On Mon, Jan 22, 2018 at 4:12 PM, Timo Walther <twal...@apac

Re: java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast to java.sql.Timestamp

2018-01-22 Thread Timo Walther
Puneet Kinra: I am getting this issue while reading from the database using jdbc connector , can you guide me how to read from there as  a string or may mapped to another type while reading from the database On Mon, Jan 22, 2018 at 4:12 PM, Timo Walther <twal...@apache.org <mailt

Re: java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast to java.sql.Timestamp

2018-01-22 Thread Timo Walther
Hi Puneet, Flink SQL does only supports java.sql.Timestamp. You need to convert it in a user-defined function or map function accordingly. Regards, Timo Am 1/22/18 um 11:38 AM schrieb Puneet Kinra: Hi I am getting the above error when deployed to the cluster ,trying to set the System

Re: Multiple Elasticsearch sinks not working in Flink

2018-01-18 Thread Timo Walther
Hi Teena, what happens if you replace the second sink with a non-ElasticSearchSink? Is there the same result? Is the data read from the KafkaTopic2? We should determine which system is the bottleneck. Regards, Timo Am 1/18/18 um 9:53 AM schrieb Teena Kappen // BPRISE: Hi, I am running

Re: Which collection to use in Scala case class

2018-01-18 Thread Timo Walther
I filed a more specific issue for this: https://issues.apache.org/jira/browse/FLINK-8451 Am 1/18/18 um 10:47 AM schrieb shashank agarwal: @Chesnay , @Timo, yes it's simple case class which i am using with java.util.List and one case class with Option and Seq. With CEP. I have filed Jira

Re: CaseClassTypeInfo fails to deserialize in flink 1.4 with Parent First Class Loading

2018-01-18 Thread Timo Walther
I filed an issue for this: https://issues.apache.org/jira/browse/FLINK-8451 Am 1/12/18 um 4:40 PM schrieb Seth Wiesman: Here is the stack trace: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at

Re: Submitting jobs via Java code

2018-01-18 Thread Timo Walther
/2018 09:39, "Timo Walther" <twal...@apache.org <mailto:twal...@apache.org>> ha scritto: Hi Luigi, can you try to load an entire configuration file via GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you tell us a little bit wh

Re: Which collection to use in Scala case class

2018-01-17 Thread Timo Walther
The issue of Scala case classes that can not be restored is a known issue in Flink 1.4. We need to investigate if it can be fixed easily. @Shashank: Could you give us a little reproducible example? Just a case class with a java.util.List in it? @Gordon: Is there a Jira issue for this? I

Re: Java types

2018-01-12 Thread Timo Walther
hould not use "org.apache.flink.streaming.api.scala.DataStream" but the Java one. I rewrote the class in Java. Thats why I am so confused Am 1/11/18 um 10:07 AM schrieb Timo Walther: Hi Boris, each API is designed language-specific so they might not always be the same. Scala

Re: Java types

2018-01-11 Thread Timo Walther
Hi Boris, each API is designed language-specific so they might not always be the same. Scala has better type extraction features and let you write code very precisely. Java requires sometime more code to archieve the same. You don't need to specify the type in .flatMap() explicitly. It will

Re: is it possible to convert "retract" datastream to table

2018-01-10 Thread Timo Walther
Hi Yan, there are no table source interfaces that allow for creating a retract stream directly yet. Such an interface has to be carefully designed because built-in operators assume that only records that have been emitted previously are retracted. However, they are planned for future Flink

Re: Two operators consuming from same stream

2018-01-03 Thread Timo Walther
Hi Tovi, I think your code without duplication performs two separate shuffle operations whereas the other code only performs one shuffle. Further latency impacts might be due to the overhead involved in maintaining the partitioning for a keyed stream/key groups and switching key contexts in

Re: keyby() issue

2018-01-02 Thread Timo Walther
Hi Jinhua, did you check the key group assignments? What is the distribution of "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data? This also depends on the hashCode on the output of your KeySelector. keyBy should handle high traffic well, but it is designed for key

Re: How to stop FlinkKafkaConsumer and make job finished?

2018-01-02 Thread Timo Walther
Hi Arnaud, thanks for letting us know your workaround. I agree that this is a frequently asked topic and important in certain use cases. I'm sure that it will be solved in the near future depending on the priorities. My 2 cents: Flink is an open source project maybe somebody is willing to

Re: Flink Kafka Consumer stops fetching records

2018-01-02 Thread Timo Walther
Hi Teena, could you tell us a bit more about your job. Are you using event-time semantics? Regards, Timo Am 1/2/18 um 6:14 AM schrieb Teena K: Hi, I am using Flink 1.4 along with Kafka 0.11. My stream job has 4 Kafka consumers each subscribing to 4 different topics. The stream from each

Re: BackPressure handling

2018-01-02 Thread Timo Walther
Hi Vishal, your assumptions sound reasonable to me. The community is currently working on a more fine-grained back pressuring with credit-based flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in Nico that might tell you more about the details. Until then I guess you have to

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-21 Thread Timo Walther
Thanks for letting us know. The netty issue will be fixed in Flink 1.4.1. For case classes there is also a dedicated cassandra sink (every case class is a Product):

Re: Can not resolve org.apache.hadoop.fs.Path in 1.4.0

2017-12-20 Thread Timo Walther
Libraries such as CEP or Table API should have the "compile" scope and should be in the both the fat and non-fat jar. The non-fat jar should contain everything that is not in flink-dist or your lib directory. Regards, Timo Am 12/20/17 um 3:07 PM schrieb shashank agarwal: Hi, In that

Re: Can not resolve org.apache.hadoop.fs.Path in 1.4.0

2017-12-19 Thread Timo Walther
Hi Shashank, it seems that HDFS is still not in classpath. Could you quickly explain how I can reproduce the error? Regards, Timo Am 12/19/17 um 12:38 PM schrieb shashank agarwal: yes, it's working fine. now not getting compile time error. But when i trying to run this on cluster or

Re: Flink 1.4 with cassandra-connector: Shading error

2017-12-19 Thread Timo Walther
ption e1) { throw new AssertionError("Cannot locate Netty classes in the classpath:" + e1); } } @Chesnay: Should we instead shade into datastax' namespace as shown? This would also make sure to follow the shaded path in that class which, for example, deactivates epoll. Nico

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-19 Thread Timo Walther
Hi Shashank, the exception you get is a known issue [0] that will be fixed with Flink 1.4.1. We improved the dependency management but it seems this causes some problems with the Cassandra connector right now. So as a workaround you can add netty (version 4.0) to your dependencies. This

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-19 Thread Timo Walther
Hi Shashank, Scala case classes are treated as a special tuple type in Flink. If you want to make a POJO out of it, just remove the "case" keyword and make sure that the class is static (in the companion object). I hope that helps. Timo Am 12/19/17 um 11:04 AM schrieb shashank agarwal:

Re: How flink assigns task slots to the streams of the same app?

2017-12-18 Thread Timo Walther
Hi Jinhua, you need to set a higher parallelism to your application when submitting your application. You can also set it via env.setParallelism() or even on an operator level map.setParallelism(). The number of task slots must be greater or equals to the parallelism of your application,

Re: Flink 1.4 with cassandra-connector: Shading error

2017-12-18 Thread Timo Walther
Hi Dominik, thanks for reporting your issue. I will loop in Chesnay that might know more about your problem. There were a lot of dependency changes in 1.4 to make the future more dependency friendly. Maybe this has not been tested properly. Regards, Timo Am 12/18/17 um 3:07 PM schrieb

Re: Watermark in broadcast

2017-12-13 Thread Timo Walther
Hi Seth, are you sure that all partitions of the broadcasted stream send a watermark? processWatermark is only called if a minimum watermark arrived from all partitions. Regards, Timo Am 12/13/17 um 5:10 PM schrieb Seth Wiesman: Hi, How are watermarks propagated during a broadcast

Re: ClassCastException when using RowTypeInfo

2017-12-13 Thread Timo Walther
Hi Madan, this is definitely a bug. The Row type has mostly been added for the Table & SQL API and has not tested for expression keys. But in general I would use a tuple in your case as they are more efficient. The `registerType` is only necessary for generic types serialized with Kryo. I

Re: netty conflict using lettuce redis client

2017-12-13 Thread Timo Walther
Hi, we just released Flink 1.4.0 [1]. Maybe it is possible for you to upgrade? One of the greatest features is improved classloading and better dependency management. I think this would be the easiest solution for you. Otherwise let us know if you still need help. Regards, Timo [1]

Re: Parallelizing a tumbling group window

2017-12-11 Thread Timo Walther
Hi Colin, unfortunately, selecting the parallelism for parts of a SQL query is not supported yet. By default, tumbling window operators use the default parallelism of the environment. Simple project and select operations have the same parallelism as the inputs they are applied on. I think

Re: Exception when using the time attribute in table API

2017-12-11 Thread Timo Walther
Hi Sendoh, at a first glance this looks like a Maven issue to me. Are you sure you are using a consistent version for both core Flink and flink-table (also consistent Scala version 2.11)? Maybe you can share your pom.xml with us. It seems that flink-table is a newer version that your Flink

Re: Flink 1.4.0 RC3 and Avro objects with maps have null values

2017-12-07 Thread Timo Walther
https://github.com/kottmann/flink-avro-issue And it is indeed like you suspected, the key is a Utf8, and I pass in a String to the get. But why did that now break with Flink 1.4.0 and runs on Flink 1.3.2? Thanks again! Jörn On Thu, Dec 7, 2017 at 3:57 PM, Timo Walther <twal...@apache.org> wro

Re: Flink 1.4.0 RC3 and Avro objects with maps have null values

2017-12-07 Thread Timo Walther
Can you also check the type of the keys in your map. Avro distinguished between String and Utf8 class. Maybe this is why your key cannot be found. Regards, Timo Am 12/7/17 um 3:54 PM schrieb Timo Walther: Hi Jörn, could you tell us a bit more about your job? Did you import the flink-avro

Re: Flink 1.4.0 RC3 and Avro objects with maps have null values

2017-12-07 Thread Timo Walther
Hi Jörn, could you tell us a bit more about your job? Did you import the flink-avro module? How does the Flink TypeInformation for your Avro type look like using println(ds.getType)? It sounds like a very weird error if the toString() method shows the key. Can you reproduce the error in your

Re: slot group indication per operator

2017-12-05 Thread Timo Walther
Hi Tovi, you are right, it is difficult to check the correct behavior. @Chesnay: Do you know if we can get this information? If not through the Web UI, maybe via REST? Do we have access to the full ExecutionGraph somewhere? Otherwise it might make sense to open an issue for this. Regards,

Re: Share state across operators

2017-12-05 Thread Timo Walther
Hi Shailesh, sharing state across operators is not possible. However, you could emit the state (or parts of it) as a stream element to downstream operators by having a function that emits a type like "Either". Another option would be to use side outputs to send state to

Re: CPU Cores of JobManager

2017-12-05 Thread Timo Walther
ich component consumes so > much CPU resources? You mean Java Flight Recorder or JITWatch? Or, Flink has original profiler? https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/application_profiling.html Regards, Yuta On 2017/12/05 18:02, Timo Walther wrote: Hi Yuta, as fa

Re: CPU Cores of JobManager

2017-12-05 Thread Timo Walther
Hi Yuta, as far as I know you cannot assign more cores to a JobManager. Can you tell us a bit more about your environment? How many jobs does the JobManager has to manage? How much heap memory is assigned to the JobManager? Maybe you can use a profiler and find out which component consumes

Re: Non-intrusive way to detect which type is using kryo ?

2017-12-01 Thread Timo Walther
: Hi Timo, Having a utility like that would be great. It'd be even better if it's can be executed without having to actually run a cluster. Best regards, Kien On 11/28/2017 3:39 PM, Timo Walther wrote: Hi Kien, at the moment I'm working on some improvements to the type system that

Re: Dataset using several count operator in the same environment

2017-11-29 Thread Timo Walther
Hi Ebru, the count() operator is a very simple utility functions that calls execute() internally. If you want to have a more complex pipeline you can take a look at how our WordCount [0] example works. The general concept is to emit a 1 for every record and sum the ones in parallel. If you

Re: Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

2017-11-29 Thread Timo Walther
Hi Bart, usually, this error means that your Maven project configuration is not correct. Is your custom class included in the jar file that you submit to the cluster? It might make sense to share your pom.xml with us. Regards, Timo Am 11/29/17 um 2:44 PM schrieb Bart Kastermans: I have

Re: Question about Timestamp in Flink SQL

2017-11-29 Thread Timo Walther
understanding that correctly? Best, wangsan On 29 Nov 2017, at 4:43 PM, Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote: Hi Wangsan, currently the timestamps in Flink SQL do not depend on a timezone. All calculations happen on the UTC timestamp. This als

Re: Question about Timestamp in Flink SQL

2017-11-29 Thread Timo Walther
Hi Wangsan, currently the timestamps in Flink SQL do not depend on a timezone. All calculations happen on the UTC timestamp. This also guarantees that an input with Timestamp.valueOf("XXX") remains consistent when parsing and outputing it with toString(). Regards, Timo Am 11/29/17 um 3:43

Re: Non-intrusive way to detect which type is using kryo ?

2017-11-28 Thread Timo Walther
Hi Kien, at the moment I'm working on some improvements to the type system that will make it easier to tell if a type is a POJO or not. I have some utility in mind like `ensurePojo(MyType.class)` that would throw an exception with a reason why this type must be treated as a generic type.

Re: Flink stress testing and metrics

2017-11-23 Thread Timo Walther
, but for now I'm using 200 bytes, I will try it with the real size later. For the data generator, it is an infinite for loop. Thanks. 2017-11-22 18:11 GMT+01:00 Timo Walther <twal...@apache.org <mailto:twal...@apache.org>>: At a first glance I would say that your data size i

Re: S3 Access in eu-central-1

2017-11-22 Thread Timo Walther
@Patrick: Do you have an advice? Am 11/22/17 um 5:52 PM schrieb domi...@dbruhn.de: Hey everyone, I'm trying since hours to get Flink 1.3.2 (downloaded for hadoop 2.7) to snapshot/checkpoint to an S3 bucket which is hosted in the eu-central-1 region. Everything works fine for other regions.

Re: Flink stress testing and metrics

2017-11-22 Thread Timo Walther
a look at the screen-shot in attach). All other metrics are working. Please help me finding the best way to do the stress testing correctly. Regards, Sadok 2017-11-22 14:52 GMT+01:00 Timo Walther <twal...@apache.org <mailto:twal...@apache.org>>: Hi Sadok, it would be helpful if

Re: How to Create Sample Data from HDFS File using Flink ?

2017-11-22 Thread Timo Walther
Hi, the sampling functions are exposed in org.apache.flink.api.java.utils.DataSetUtils. So you can basically can create something like: final HadoopInputFormat inputFormat = HadoopInputs.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, hdfsPath);

Re: Impersonate user for hdfs

2017-11-22 Thread Timo Walther
Hi Vishal, shouldn't it be possible to configure a proxy user via core-site.xml? Flink is also using this XML for HDFS. You can also set the configuration files manually, see https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs Regards, Timo Am 11/21/17 um

Re: Flink stress testing and metrics

2017-11-22 Thread Timo Walther
Hi Sadok, it would be helpful if you could tell us a bit more about your job. E.g. a skewed key distribution where keys are only sent to one third of your operators can not use your CPUs full capabilities. The latency tracking interval is in milliseconds. Can you try if 1000 would fix your

Re: Tooling for resuming from checkpoints

2017-11-22 Thread Timo Walther
Hi Dominik, the Web UI shows you the status of a checkpoint [0], so it might be possible to retrieve the information via REST calls. Usually, you should perform a savepoint for planned restarts. If a savepoint is successful you can be sure to restart from it. Otherwise the platform from

Re: Problem with SQL-API and nested objects in case class

2017-11-21 Thread Timo Walther
Actually, your use case should be doable with Flink's Table & SQL API with some additional UDFs. The API can handle JSON objects if they are valid composite types and you can access arrays as well. The splitting might be a bit tricky in SQL, you could model it simply as a where() clause or

Re: Hive integration in table API and SQL

2017-11-21 Thread Timo Walther
mean there is no way I can make operations, like join, on a streaming table and a batch table ? Best, wangsan On 20 Nov 2017, at 9:15 PM, Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote: Timo

Re: Hive integration in table API and SQL

2017-11-20 Thread Timo Walther
Hi Wangsan, yes, the Hive integration is limited so far. However, we provide an external catalog feature [0] that allows you to implement custom logic to retrieve Hive tables. I think it is not possible to do all you operations in Flink's SQL API right now. For now, I think you need to

Re: Problem with SQL-API and nested objects in case class

2017-11-20 Thread Timo Walther
Hi, thanks for writing on the mailling list. I could reproduce your error and opened an issue for it (https://issues.apache.org/jira/browse/FLINK-8107). UNNEST currently only supports unnesting and joining an array of the same relation. However joining of two relations will be supported soon

Re: [Flink] How to Converting DataStream to Dataset or Table?

2017-11-16 Thread Timo Walther
Hi Richard, in general it is difficult to sort a DataStream that is potentially neverending. However, if you use Flink's event-time semantics with watermarks that indicate that you stream is complete until a certain point you can sort it. The Table API will offer a a sort option in 1.4

Re: Help on RowTypeInfo?

2017-10-28 Thread Timo Walther
Hi, the translate() method is an internal method. You can use "toRetractStream(table, Row.class)" or "toAppendStream(table, Row.class)" to convert you table into a stream. Make sure to use the correct StreamTableEnvironment for your API: org.apache.flink.table.api.java.StreamTableEnvironment

Re: Incompatible types of expression and result type.

2017-10-24 Thread Timo Walther
Hi, I could found the problem in your implementation. The Table API program is correct. However, the DataStream program that you construct in your TableSource has a wrong type. When ever you use a Row type, you need to specify the type either by implementing ResultTypeQueryable or in your

Re: How to test new sink

2017-10-24 Thread Timo Walther
. On 23 Oct 2017, at 17:51, Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote: Hi Rinat, using one of the Flink test utilities is a good approach to test your custom operators. But of course these classes might change in the future. First of all, Flink is a

Re: Sample project on real time ingestion of more than 1Billion events at a time

2017-10-23 Thread Timo Walther
Hi Deepak, actually, every Flink example program can scale up to millions of events and more. The Flink APIs are designed to abstract the business logic from the parallelism. You just need to implement the interfaces that Flink provides. If you are interesting in some example program, I can

Re: How to test new sink

2017-10-23 Thread Timo Walther
Hi Rinat, using one of the Flink test utilities is a good approach to test your custom operators. But of course these classes might change in the future. First of all, Flink is a open source project so you can just copy the required classes. However, it should be possible to use the Flink

Re: How can I create a savepoint if I have Flink running in containers?

2017-10-23 Thread Timo Walther
Hi, I'm not a deployment expert but I think creating a savepoint should still be doable throught the CLI client. The Flink JobManager and TaskManager just run the containers and the CLI connects to a JobManager. I will loop in someone more familar with deployment. We should definitely

Re: ResultPartitionMetrics

2017-10-23 Thread Timo Walther
Hi Aitozi, I will loop in people that are more familar with the network stack and metrics. Maybe this is a bug? Regards, Timo Am 10/22/17 um 4:36 PM schrieb aitozi: Hi, i see in version 1.3, it add the ResultPartitionMetrics with issue:https://issues.apache.org/jira/browse/FLINK-5090

Re: Flink 1.4 release timeline

2017-10-23 Thread Timo Walther
Hi Moiz, the community is working hard to fix the last blockers for the release. The feature freeze should happen end of this month. After that we will test the release over some weeks. I think you can expect the 1.4 release end of November. Feel free to help us with the release by testing

Re: The TypeInformation about Table API

2017-10-23 Thread Timo Walther
Hi Han, generally, Flink is a strongly typed system. I think the easiest way to handle a dynamic schema is to read your JSON as a String. You can then implement your own ScalarFunction (or in this case also a TableFunction) [1] and use any JSON parsing library in this function for

Re: kafka consumer parallelism

2017-10-02 Thread Timo Walther
Hi, I'm not a Kafka expert but I think you need to have more than 1 Kafka partition to process multiple documents at the same time. Make also sure to send the documents to different partitions. Regards, Timo Am 10/2/17 um 6:46 PM schrieb r. r.: Hello I'm running a job with "flink run -p5"

Re: Session Window set max timeout

2017-10-02 Thread Timo Walther
Hi, I would recommend to implement your custom trigger in this case. You can override the default trigger of your window: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#triggers This is the interface where you can control the triggering:

Re: Flink Watermark and timing

2017-10-02 Thread Timo Walther
Hi Björn, I don't know if I get your example correctly, but I think your explanation "All events up to and equal to watermark should be handled in the prevoius window" is not 100% correct. Watermarks just indicate the progress ("until here we have seen all events with lower timestamp than

Re: How to deal with blocking calls inside a Sink?

2017-10-02 Thread Timo Walther
Hi Federico, would it help to buffer events first and perform batches of insertions for better throughtput? I saw some similar work recently here: https://tech.signavio.com/2017/postgres-flink-sink But I would first try the AsyncIO approach, because actually this is a use case it was made

Re: Enriching data from external source with cache

2017-10-02 Thread Timo Walther
Hi Derek, maybe the following talk can inspire you, how to do this with joins and async IO: https://www.youtube.com/watch?v=Do7C4UJyWCM (around the 17th min). Basically, you split the stream and wait for an Async IO result in a downstream operator. But I think having a transient guava cache

Re: Windowing isn't applied per key

2017-10-02 Thread Timo Walther
Hi Marcus, from a first glance your pipeline looks correct. It should not be executed with a parallelism of one, if not specified explicitly. Which time semantics are you using? If it is event-time, I would check your timestamps and watermarks assignment. Maybe you can also check in the web

Re: Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

2017-09-25 Thread Timo Walther
Hi Federico, I think going through a Storm compatibility layer could work, but did you thought about using the flink-jdbc connector? That should be the easiest solution. Otherwise I think it would be easier to quickly implement your our SinkFunction. It is just one method that you have to

Re: Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

2017-09-25 Thread Timo Walther
Hi, I also replied to your Stackoverflow question. I think the problem is that BillCount has the wrong type and is therefore treated as one single black box. Haohui's suggestion will no work because the row type needs information about the fields.  The easiest thing is to figure out why

Re: Flink 1.2.1 JobManager Election Deadlock

2017-09-07 Thread Timo Walther
Thanks for informing us. As far as I know, we were not aware of any deadlock in the JobManager election. Let's hope that the updated Curator version fixed the problem. We will defenitely keep an eye on this. Feel free to contact the dev@ mailing list, if the problem still exists in 1.3.2.

Re: MapState Default Value

2017-09-07 Thread Timo Walther
I will loop in Stefan, who might know the answer. Am 07.09.17 um 02:10 schrieb Navneeth Krishnan: Hi, Is there a reason behind removing the default value option in MapStateDescriptor? I was using it in the earlier version to initialize guava cache with loader etc and in the new version by

Re: Exception when using keyby operator

2017-09-07 Thread Timo Walther
Hi Sridhar, according to the exception, your "meEvents" stream is not POJO. You can check that by printing "meEvents.getType()". In general, you can always check the log for such problems. There should be something like: 14:40:57,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor

Re: Flink on AWS EMR Protobuf

2017-09-07 Thread Timo Walther
I'm not sure if this is a Flink issue. It popped up on other non-Flink projects as well: http://community.cloudera.com/t5/Storage-Random-Access-HDFS/map-red-over-hbase-in-cdh-5-7/td-p/43902 I would defenitely check your dependencies. This looks like conflicting versions in your classpaths.

Re: Question about Flink internals

2017-09-07 Thread Timo Walther
Hi Junguk, I try to answer your questions, but also loop in Ufuk who might now more about the network internals: 1. Yes, every operator/operator chain has a "setParallelism()" method do specify the parallelism. The overall parallelism of the job can be set when submitting a job. The

Re: datastream.print() doesn't works

2017-08-29 Thread Timo Walther
Don't forget to call env.execute() at the end and make sure you have configured your logger correctly. Regards, Timo Am 29.08.17 um 14:59 schrieb Chesnay Schepler: The easiest explanation is that there is nothing to print. Since print statements within the select function don't appear in the

Re: PageRank - 4x slower then Spark?!

2017-08-23 Thread Timo Walther
You could enable object reuse [0] if you application allows that. Also adjusting the managed memory size [1] can help. Are you using Flink's graph library Gelly? [0] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#object-reuse-enabled [1]

Re: Serialization problem: Using generic that extends a class on POJO.

2017-08-15 Thread Timo Walther
Am 14.08.17 um 17:24 schrieb Timo Walther: Hi Ido, at the first glance, I could not find any problem in your code. So it might be a bug. The "environment.registerType()" is not needed in your case, because you have no generic types. I will have a closer look at it tomorrow. Regards

Re: Serialization problem: Using generic that extends a class on POJO.

2017-08-14 Thread Timo Walther
Hi Ido, at the first glance, I could not find any problem in your code. So it might be a bug. The "environment.registerType()" is not needed in your case, because you have no generic types. I will have a closer look at it tomorrow. Regards, Timo Am 14.08.17 um 16:35 schrieb Ido Bar Av:

Re: Test example and lambdas

2017-08-07 Thread Timo Walther
Hi, Row is very special data type, because Flink cannot extract the field types automatically based on Java generics. By default it is serialized by Kryo, you need to specify the field types using Typles.ROW(Types.STRING, ...) and pass this information in your `.returns()` methods instead of

Re: Eventime window

2017-08-02 Thread Timo Walther
after 10 seconds irrespective of whether a new message arrives or not. Thanks, Govind On Aug 2, 2017, at 6:56 AM, Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote: Hi Govind, if the window is not triggered, this usually indicates that your timestamp a

Re: Eventime window

2017-08-02 Thread Timo Walther
Hi Govind, if the window is not triggered, this usually indicates that your timestamp and watermark assignment is not correct. According to your description, I don't think that you need a custom trigger/evictor. How often do events arrive from one device? There must be another event from the

Re: Storing POJO's to RocksDB state backend

2017-08-02 Thread Timo Walther
Hi Biplob, Flink is shipped with own serializers. POJOs and other datatypes are analyzed automatically. Kryo is only the fallback option, if your class does not meet the POJO criteria (see [1]). Usually, all serialization/deserialization to e.g. RocksDB happens internally and the user

Re: How can I set charset for flink sql?

2017-07-25 Thread Timo Walther
Hi, currently Flink does not support this charset in a LIKE expression. This is due to a limitation in the Apache Calcite library. Maybe you can open an issue there. The easiest solution for this is to implement your own scalar function, that does a `string.contains("")`. Here you can

Re: AVRO Union type support in Flink

2017-07-19 Thread Timo Walther
Hi Vishnu, I took a look into the code. Actually, we should support it. However, those types might be mapped to Java Objects that will be serialized with our generic Kryo serializer. Have you tested it? Regards, Timo Am 19.07.17 um 06:30 schrieb Martin Eden: Hey Vishnu, For those of us

Re: Reading static data

2017-07-13 Thread Timo Walther
Hi Mohit, do you plan to implement a batch or streaming job? If it is a streaming job: You can use a connected stream (see [1], Slide 34). The static data is one side of the stream that could be updated from time to time and will always propagated (using a broadcast()) to all workers that do

Re: Read configuration instead of hard code

2017-07-13 Thread Timo Walther
Hi Desheng, Flink programs are defined in a regular Java main() method. They are executed on the Flink Client (usually the JobManeger) when submitted, you can add arbirary additional logic (like reading a file from an NFS) to the code. After retrieving the Kafka Info you can pass it to the

Re: Flink Elasticsearch Connector: Lucene Error message

2017-07-13 Thread Timo Walther
Hi Fabian, I loop in Gordon. Maybe he knows whats happening here. Regards, Timo Am 13.07.17 um 13:26 schrieb Fabian Wollert: Hi everyone, I'm trying to make use of the new Elasticsearch Connector

Re: About nodes number on Flink

2017-06-26 Thread Timo Walther
If you really what to run one operation per node. You start 1 TaskManager with 1 slot on every node. For each operation you set a new chain and a new slot sharing group. Timo Am 23.06.17 um 15:03 schrieb AndreaKinn: Hi Timo, thanks for your answer. I think my elaboration are not too much

Re: About nodes number on Flink

2017-06-23 Thread Timo Walther
Hi Andrea, the number of nodes usually depends on the work that you do within your Functions. E.g. if you have a computation intensive machine learning library in a MapFunction and takes 10 seconds per element, it might make sense to paralellize this in order to increase your throughput. Or

Re: Stream sql example

2017-06-09 Thread Timo Walther
Hi David, I think the problem is that the type of the DataStream produced by the TableSource, does not match the type that is declared in the ` getReturnType()`. A `MapFunction` is always a generic type (because Row cannot be analyzed). A solution would be that the mapper

<    1   2   3   4   5   6   7   >