Request for realistic Flink programs to check correct using differential testing

2019-09-09 Thread Filip Niksic
Hi all,

I am part of a research group at the University of Pennsylvania. We are
researching distributed stream processing systems; in particular, we are
developing a framework for checking correctness of Flink programs through
differential testing.

What I want to ask you as a community of Flink users is the following:

1. What infrastructure is available for more sophisticated testing
(fuzzing, random testing, differential testing, etc.) of Flink programs?

2. Do you know of some realistic Flink programs (perhaps a bit more
involved than the examples that come with Flink) that are available online,
and on which we can evaluate our framework?

3. What are some Flink programs that you would be most interested in seeing
more rigorous verification and testing of?

Best regards,

Filip


How to handle avro BYTES type in flink

2019-09-09 Thread Catlyn Kong
Hi fellow streamers,

I'm trying to support avro BYTES type in my flink application. Since
ByteBuffer isn't a supported type, I'm converting the field to an
Array[Byte]:

case Type.BYTES =>
  (avroObj: AnyRef) => {
 if (avroObj == null) {
   null
 } else {
   val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
   val bytes = new Array[Byte](byteBuffer.remaining())
   byteBuffer.get(bytes)
   bytes
   }
 }

And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this field.
I'm getting ArrayIndexOutOfBoundsException:

Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)

Does anyone have experience with deserializing BYTES type from avro and
make it compatible with the table api? Wondering if it's cause I didn't use
the correct type or maybe I need to verify if there's enough data left in
the source?

Any input is appreciated.

Thanks!
Catlyn


Flink SQL problem

2019-09-09 Thread davran.muzavarov
Hi I have encountered a problem with Flink SQL.

 

My code:

 

DataSet dataSet0 = env.fromCollection( infos0 );

tableEnv.registerDataSet( "table0", dataSet0 );

 

 

String sql = "select closePrice from table0"

 

Table table = tableEnv.sql( sql );

tableEnv.registerTable( tableName, table );

 

 

DataSet redyData = tableEnv.toDataSet( table, Row.class );

 

This works fine. 

 

But when I change SQL to "select distinct closePrice from table0"
"tableEnv.toDataSet" throws exception:

 

java.lang.AssertionError: Internal error: Error occurred while applying rule
DataSetAggregateRule

at org.apache.calcite.util.Util.newInternal(Util.java:792)

at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.
java:149)

at
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)

at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:118)

at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java
:214)

at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.ja
va:825)

at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)

at
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnviron
ment.scala:253)

at
org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEn
vironment.scala:146)

at
com.streamingedge.marketreport.analytics.flink.FlinkDataSetAnalytics.analize
(FlinkDataSetAnalytics.java:96)

at
com.streamingedge.marketreport.webserver.AnalyticsServlet.processRequest(Ana
lyticsServlet.java:117)

at
com.streamingedge.marketreport.webserver.AnalyticsServlet.doPost(AnalyticsSe
rvlet.java:40)

at
com.streamingedge.marketreport.webserver.AnalyticsServlet.doGet(AnalyticsSer
vlet.java:35)

at
javax.servlet.http.HttpServlet.service(HttpServlet.java:707)

at
javax.servlet.http.HttpServlet.service(HttpServlet.java:820)

at
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)

at
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)

at
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java
:229)

at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java
:1086)

at
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:427)

at
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:
193)

at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:
1020)

at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135
)

at
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHand
lerCollection.java:255)

at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:1
16)

at org.eclipse.jetty.server.Server.handle(Server.java:366)

at
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpCo
nnection.java:494)

at
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpC
onnection.java:973)

at
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplet
e(AbstractHttpConnection.java:1035)

at
org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:641)

at
org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:231)

at
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java
:82)

at
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.
java:696)

at
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.j
ava:53)

at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:
608)

at
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:5
43)

at java.lang.Thread.run(Unknown Source)

Caused by: org.apache.flink.api.table.TableException: Unsupported data type
encountered

at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRo
wSize$2.apply(DataSetRel.scala:65)

at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRo
wSize$2.apply(DataSetRel.scala:53)

at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:5
1)

at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scal
a:60)

at
scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)

  

Using FlinkKafkaConsumer API

2019-09-09 Thread Vishwas Siravara
I am using flink-kafka-connector and this is my dependency

"org.apache.flink" %% "flink-connector-kafka" % *"1.7.0"*,


Whe I look at my dependency tree the kafka client version is

 -org.apache.kafka:kafka-clients:2.0.1 which comes from the above package.


However when I run my code in the cluster I see that the kafka-client that
is loaded is

 0.10.2.0


Here is the task executor log :

2019-09-09 03:05:56,825 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
version : 0.10.2.0

I am struggling to find out where this dependency is coming from. Our
broker version is not

compatible with this client. How can I force flink to use 2.0.1.


Also the API I use for Kafka Consumer is

 private[flink] def sourceType: FlinkKafkaConsumer[GenericRecord] = {
val consumer = new FlinkKafkaConsumer[GenericRecord](
  source.asJava,
  AvroDeserialization.genericRecd,
  ExecutionEnv.streamProperties)
consumer
  }

}


I really appreciate help. Is there any way I can find out where this
dependency comes from in the cluster as this is clearly not coming
form my application.



Thanks,

Vishwas


StreamingFileSink rolling callback Inbox

2019-09-09 Thread Anton Parkhomenko
Hello,

I’m writing a Flink job that reads heterogenius (one row contains several
types that need to be partitioned downstream) data from AWS Kinesis and
writes to S3 directory structure like s3://bucket/year/month/day/hour/type,
this all works great with StreamingFileSink in Flink 1.9, but problem is
that I need to immedietely (or “as soon as possible” rather) let know
another application to know when “hour” bucket has rolled (i.e. we’re 100%
sure it won’t write any more data for this hour). Another problem is that
data can be very skewed in types, e.g. one hour can contain 90% of rows
with typeA, 30% of rows with typeB and 1% of rows with typeC.

My current plan is to:

1. Split the stream in windows using TumblingProcessingTimeWindows (I don’t
care about event time at all)
2. Assign every row its bucket in a windowing function
3. Write a stateful BucketAssigner that:
3.1. Keeps its last window in a mutable variable
3.2. Once we received a row with newer window sends a message to SQS and
increments the window

My biggest concern now is about 3rd point. For me BucketAssigner looks like
a pure function of (Row, Time) -> Bucket and I’m not sure that introducing
state and side-effect there would be reasonable. Is there any other ways to
do it? I’m also thinking on how I should couple this with checkpointing
mechanism as ideally I’d like to not invoke this callback before checkpoint
is written.

StreamingFileSink provides not much ways to extend it. I tried to
re-implement it for my purposes, but stumbled upon many private methods and
classes, so even though it looks possible, the end result probably will be
too ugly.

To make things a little bit easier, I don’t care too much about delivery
semantics of those final SQS messages - if I get only ~99% of them - that’s
fine, if some of them will be duplicated - that’s also fine.

Regards,
Anton


Re: Will there be a Flink 1.9.1 release ?

2019-09-09 Thread Debasish Ghosh
Thanks Kurt. I was just asking as it would help us a lot with the issue (
https://github.com/apache/flink/pull/9565) that I mentioned in my mail. It
got merged recently (after the 1.9.0 release).

BTW, the issue you mentioned didn't fixed in 1.9.1.


Will u please explain what u mean by this. The issue that I mentioned got
merged recently after the 1.9.0 release.

regards.

On Mon, Sep 9, 2019 at 3:34 PM Kurt Young  wrote:

> Hi Debasish,
>
> I think there is a good chance to have 1.9.1, the only question is when.
> 1.9.0 released ~2 weeks ago, and I think some users are still under the
> migration if they want to use 1.9.0. Wait another 1 or 2 weeks and also
> see whether there are some critical bugs in 1.9.0 sounds reasonable to
> me.
>
> BTW, the issue you mentioned didn't fixed in 1.9.1.
>
> Best,
> Kurt
>
>
> On Mon, Sep 9, 2019 at 5:54 PM Kostas Kloudas  wrote:
>
>> Hi Debasish,
>>
>> So far I am not aware of any concrete timeline for Flink 1.9.1 but
>> I think that Gordon and Kurt (cc'ed) who were the release-1.9
>> managers are the best to answer this question.
>>
>> Cheers,
>> Kostas
>>
>> On Mon, Sep 9, 2019 at 9:38 AM Debasish Ghosh 
>> wrote:
>> >
>> > Hello -
>> >
>> > Is there a plan for a Flink 1.9.1 release in the short term ? We are
>> using Flink and Avro with Avrohugger generating Scala case classes form
>> Avro schema. Hence we need https://github.com/apache/flink/pull/9565
>> which has been closed recently.
>> >
>> > regards.
>> >
>> >
>> > --
>> > Debasish Ghosh
>> > http://manning.com/ghosh2
>> > http://manning.com/ghosh
>> >
>> > Twttr: @debasishg
>> > Blog: http://debasishg.blogspot.com
>> > Code: http://github.com/debasishg
>>
>

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: Will there be a Flink 1.9.1 release ?

2019-09-09 Thread Kurt Young
Hi Debasish,

I think there is a good chance to have 1.9.1, the only question is when.
1.9.0 released ~2 weeks ago, and I think some users are still under the
migration if they want to use 1.9.0. Wait another 1 or 2 weeks and also
see whether there are some critical bugs in 1.9.0 sounds reasonable to
me.

BTW, the issue you mentioned didn't fixed in 1.9.1.

Best,
Kurt


On Mon, Sep 9, 2019 at 5:54 PM Kostas Kloudas  wrote:

> Hi Debasish,
>
> So far I am not aware of any concrete timeline for Flink 1.9.1 but
> I think that Gordon and Kurt (cc'ed) who were the release-1.9
> managers are the best to answer this question.
>
> Cheers,
> Kostas
>
> On Mon, Sep 9, 2019 at 9:38 AM Debasish Ghosh 
> wrote:
> >
> > Hello -
> >
> > Is there a plan for a Flink 1.9.1 release in the short term ? We are
> using Flink and Avro with Avrohugger generating Scala case classes form
> Avro schema. Hence we need https://github.com/apache/flink/pull/9565
> which has been closed recently.
> >
> > regards.
> >
> >
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2
> > http://manning.com/ghosh
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com
> > Code: http://github.com/debasishg
>


Re: Will there be a Flink 1.9.1 release ?

2019-09-09 Thread Kostas Kloudas
Hi Debasish,

So far I am not aware of any concrete timeline for Flink 1.9.1 but
I think that Gordon and Kurt (cc'ed) who were the release-1.9
managers are the best to answer this question.

Cheers,
Kostas

On Mon, Sep 9, 2019 at 9:38 AM Debasish Ghosh  wrote:
>
> Hello -
>
> Is there a plan for a Flink 1.9.1 release in the short term ? We are using 
> Flink and Avro with Avrohugger generating Scala case classes form Avro 
> schema. Hence we need https://github.com/apache/flink/pull/9565 which has 
> been closed recently.
>
> regards.
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg


Will there be a Flink 1.9.1 release ?

2019-09-09 Thread Debasish Ghosh
Hello -

Is there a plan for a Flink 1.9.1 release in the short term ? We are using
Flink and Avro with Avrohugger generating Scala case classes form Avro
schema. Hence we need https://github.com/apache/flink/pull/9565 which has
been closed recently.

regards.


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: suggestion of FLINK-10868

2019-09-09 Thread Till Rohrmann
Hi Anyang,

I think we cannot take your proposal because this means that whenever we
want to call notifyAllocationFailure when there is a connection problem
between the RM and the JM, then we fail the whole cluster. This is
something a robust and resilient system should not do because connection
problems are expected and need to be handled gracefully. Instead if one
deems the notifyAllocationFailure message to be very important, then one
would need to keep it and tell the JM once it has connected back.

Cheers,
Till

On Sun, Sep 8, 2019 at 11:26 AM Anyang Hu  wrote:

> Hi Peter,
>
> For our online batch task, there is a scene where the failed Container
> reaches MAXIMUM_WORKERS_FAILURE_RATE but the client will not immediately
> exit (the probability of JM loss is greatly improved when thousands of
> Containers is to be started). It is found that the JM disconnection (the
> reason for JM loss is unknown) will cause the notifyAllocationFailure not
> to take effect.
>
> After the introduction of FLINK-13184
>  to start  the container
> with multi-threaded, the JM disconnection situation has been alleviated. In
> order to stably implement the client immediate exit, we use the following
> code to determine  whether call onFatalError when
> MaximumFailedTaskManagerExceedingException is occurd:
>
> @Override
> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
> Exception cause) {
>validateRunsInMainThread();
>
>JobManagerRegistration jobManagerRegistration = 
> jobManagerRegistrations.get(jobId);
>if (jobManagerRegistration != null) {
>   
> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
>  cause);
>} else {
>   if (exitProcessOnJobManagerTimedout) {
>  ResourceManagerException exception = new 
> ResourceManagerException("Job Manager is lost, can not notify allocation 
> failure.");
>  onFatalError(exception);
>   }
>}
> }
>
>
> Best regards,
>
> Anyang
>
>