[jira] [Created] (FLINK-6272) Rolling file sink saves incomplete lines on failure
Jakub Nowacki created FLINK-6272: Summary: Rolling file sink saves incomplete lines on failure Key: FLINK-6272 URL: https://issues.apache.org/jira/browse/FLINK-6272 Project: Flink Issue Type: Bug Components: filesystem-connector, Streaming Connectors Affects Versions: 1.2.0 Environment: Flink 1.2.0, Scala 2.11, Debian GNU/Linux 8.7 (jessie), CDH 5.8, YARN Reporter: Jakub Nowacki We have simple pipeline with Kafka source (0.9), which transforms data and writes to Rolling File Sink, which runs on YARN. The sink is a plain HDFS sink with StringWriter configured as follows: {code:java} val fileSink = new BucketingSink[String]("some_path") fileSink.setBucketer(new DateTimeBucketer[String]("-MM-dd")) fileSink.setWriter(new StringWriter()) fileSink.setBatchSize(1024 * 1024 * 1024) // this is 1 GB {code} Checkpoint is on. Both Kafka source and File sink are in theory with [exactly-once guarantee|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/guarantees.html]. On failure in some files, which seem to be complete (not {{in_progress}} files ore something, but under 1 GB and confirmed to be created on failure), it comes out that the last line is cut. In our case it shows because we save the data in line-by-line JSON and this creates invalid JSON line. This does not happen always when the but I noticed at least 3 incidents like that at least. Also, I am not sure if it is a separate bug but we see some data duplication in this case coming from Kafka. I.e.after the pipeline is restarted some number of messages come out from Kafka source, which already have been saved in the previous file. We can check that the messages are duplicated as they have same data but different timestamp, which is added within Flink pipeline. This should not happen in theory as the sink and source have [exactly-once guarantee|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/guarantees.html]. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6098) Cassandra sink freezes after write error
Jakub Nowacki created FLINK-6098: Summary: Cassandra sink freezes after write error Key: FLINK-6098 URL: https://issues.apache.org/jira/browse/FLINK-6098 Project: Flink Issue Type: Bug Components: Cassandra Connector Affects Versions: 1.2.0 Environment: Flink 1.2.0, standalone cluster, Debian GNU/Linux 8.7 (jessie) Reporter: Jakub Nowacki I am having problem with a very simple pipeline taking messages form Kafka and writing them into Cassandra. The pipeline runs fine for a number of days and at some point I am getting the below error in the taskmanager logs: {code} 2017-03-13 16:01:50,699 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase - Error while sending value. com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency LOCAL_ONE (1 replica were required but only 0 acknowledged the write) Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency LOCAL_ONE (1 replica were required but only 0 acknowledged the write) {code} The job seems to be running fine, but it does not process any messages, which is only visible in the metrics and in the JobManager log: {code} 2017-03-14 14:00:44,611 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 42288 @ 1489496444610 2017-03-14 14:00:44,612 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 42288 because of checkpoint decline from task 35926157acfb1b68d1f6c9abcd90c8b4 : Task Source: Custom Source -> Map -> Map -> Sink: Cassandra Sink (1/2) was not running {code} I know this is some Cassandra hiccup, but in theory pipeline should recover after a failure or fail and stop. Everything seems fine and I didn't find any information specific to the reconnect after failure for the Cassandra Connector. The only thing I'm not sure if it's done correctly is the ClusterBuilder; i.e I use the below code in the job definition main method (in Scala): {code:java} val cassandraServers = parameters.get("cassandra.servers", "localhost").split(",") val cassandraUser = parameters.get("cassandra.user") val cassandraPassword = parameters.get("cassandra.password") val clusterBuilder = new ClusterBuilder() { override def buildCluster(builder: Cluster.Builder): Cluster = { cassandraServers.foreach(builder.addContactPoint) if (cassandraUser != null && cassandraPassword != null) builder.withCredentials(cassandraUser, cassandraPassword) builder.build() } } {code} The job starts correctly but I'm not sure if the configuration from the properties is pulled correctly on the taskmanages, as I understand the {{buildCluster}} call is done on its side. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5080) Cassandra connector ignores saveAsync result onSuccess
Jakub Nowacki created FLINK-5080: Summary: Cassandra connector ignores saveAsync result onSuccess Key: FLINK-5080 URL: https://issues.apache.org/jira/browse/FLINK-5080 Project: Flink Issue Type: Bug Components: Cassandra Connector Affects Versions: 1.1.3 Reporter: Jakub Nowacki When a record is saved to Cassandra it may return a ResultSet to the callback given in the saveAsync; e.g. when we do {{INSERT ... IF NOT EXISTS}}, a ResultSet is returned with column {{applied: false}} if the record exists and the new record has not been inserted. Thus, we loose data in such case. The minimal solution would be to log the result. The best solution would be to add possibility of passing a custom callback; in this way we could deal with a success or failure in more custom way. Other solution is to add a possibility to pass onSuccess and onFailure functions, which would be executed inside the callback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5070) Unable to use Scala's BeanProperty with classes
Jakub Nowacki created FLINK-5070: Summary: Unable to use Scala's BeanProperty with classes Key: FLINK-5070 URL: https://issues.apache.org/jira/browse/FLINK-5070 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.1.3 Reporter: Jakub Nowacki When using Scala class with with property (both var and val) annotated as BeanProperty, throws an exception {{java.lang.IllegalStateException: Detected more than one getter}}. The simple code which follows throws that exception: {code:java} class SomeClass(@BeanProperty var prop: Int) object SampleBeanProperty { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironment() // Create a DataSet from a list of elements env.fromElements(1,2) .map(new SomeClass(_)) .print env.execute() } } {code} Full exception: {code} Exception in thread "main" java.lang.IllegalStateException: Detected more than one setter at org.apache.flink.api.java.typeutils.TypeExtractor.isValidPojoField(TypeExtractor.java:1646) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1692) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1580) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1479) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:737) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:543) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:497) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:493) at SampleBeanProperty$.main(SampleBeanProperty.scala:18) at SampleBeanProperty.main(SampleBeanProperty.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) {code} If the class is changed into case class, code with BeanProperty works fine. -- This message was sent by Atlassian JIRA (v6.3.4#6332)