Re: UI stability at high parallelism

2020-02-14 Thread Weihua Hu
These logs prove that it is indeed a timeout issue, In our scenario, it was due 
to the task deploy took a lot of time.
You can check if the time from Task from SCHEDULED to DEPLOYING in the log is 
greater than 10s. This step are processed in mainThread and will block the 
processing of requests from the UI. 

By now, you can increase the ‘akka.ask.timeout’ to avoid this. 

I have created a jira issue to improve this. 
https://issues.apache.org/jira/browse/FLINK-16069 
 .

Best
Weihua Hu

> 2020年2月15日 01:54,Richard Moorhead  写道:
> 
> 2020-02-14 11:50:35,402 ERROR 
> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler  - Unhandled 
> exception.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher#1293527273]] after [1 ms]. Message of 
> type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
>   at 
> akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
>   at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
>   at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
>   at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
>   at 
> scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
>   at java.lang.Thread.run(Thread.java:748)
> 
> 
> 
> On Wed, Feb 12, 2020 at 11:30 PM HuWeihua  > wrote:
> Hi, Richard
> 
> This is most likely that the Rest Api has timed out, you can try to find some 
> evidence in the jobmanager log.
> 
> You can provide the full log to help us find the root cause.
> 
> 
> Best
> Weihua Hu
> 
>> 2020年2月13日 09:40,Richard Moorhead > > 写道:
>> 
>> When I submit a job to flink session with parallelism higher than 128, the 
>> job is submitted and renders in the UI but when I view the job itself the UI 
>> starts to rapidly emit errors in the upper right:
>> 
>> Server Response:
>> Unable to load requested file /bad-request.
>> 
>> Is this a known issue? Is there a fix? Does this indicate underlying 
>> stability issues?
> 



KafkaFetcher closed before end of stream is received for all partitions.

2020-02-14 Thread Bill Wicker
Hello all,

I'm new to Flink and I have been developing a series of POCs in preparation for 
a larger project that will utilize Flink. One use case we have is to utilize 
the same job for both batch and streaming processing using Kafka as the source. 
When the job is run in batch mode we expect that it will be started, consume 
all data from the Kafka topic and then shutdown. I have achieved this using the 
isEndOfStream method in the KafkaDeserializationSchema, however I have one 
outstanding problem that may be a bug.

The problem arises when the source parallelism is less than the topic's 
partition count. In this case, there will be a single KafkaFetcher instance per 
sub-task and each instance may be responsible for multiple partitions. My 
producer publishes some data and then publishes a final message to each 
partition with a flag indicating it is the end of stream. When the KafkaFetcher 
receives an end of stream message it stops running and closes. However, the 
shutdown gets invoked when the first end of stream message is received even 
though other partitions that the fetcher is responsible for may not yet have 
reached the end of stream. In this case, when the job is started again, some of 
the EOS messages remain on the topic and will immediately be consumed causing 
some of the sub-tasks to exit before consuming any new data.

Is this the expected behavior, i.e. that each KafkaFetcher will stop running as 
soon as a single end of stream message is received rather than waiting for all 
partitions to receive one? If so, is there some other way to achieve my goal of 
having the application gracefully shutdown once all data has been consumed from 
the topic?

A few assumptions:

  1.  The producer is a legacy application that produces to Kafka and can't be 
changed
  2.  Once the producer is done publishing, another application will be invoked 
to publish the EOS messages and then launch the Flink job
  3.  The Flink job should exit automatically once all data has been consumed
  4.  There should not be any unconsumed EOS messages.

Thank you  in advance!

Bill Wicker | +1 347-369-4580
Software Development Consultant, Risk Focus
New York | London | Riga | Pittsburgh | Toronto
[image001][ConfluentCertifiedDeveloperBadge_email]



Flink Savepoint error

2020-02-14 Thread Soheil Pourbafrani
Hi,

I developed a Flink application that read data from files and inserts them
into the database. During the job running, I attempted to get a savepoint
and cancel the job but I got the following error:

Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.Checkpo
> intTriggerException: Failed to trigger savepoint. Decline reason: Not all
> required tasks are cur
> rently running.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
> at
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:966)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:
> 247)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162
> )
> ... 15 more
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointTriggerException:
> Failed to trigger sav
> epoint. Decline reason: Not all required tasks are currently running.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(Checkpoint
> Coordinator.java:379)


The Flink version is 1.7.2 and I use Java Oracle 1.8.0_91-b14

Any idea?

Best regards,
Soheil


TaskManager Fail when I cancel the job and crash

2020-02-14 Thread Soheil Pourbafrani
Hi,

I developed a single Flink job that read a huge amount of files and after
some simple preprocessing, sink them into the database. I use the built-in
JDBCOutputFormat for inserting records into the database. The problem is
when I cancel the job using either the WebUI or the command line, the job
did not cancel completely and finally, the taskmanager process crashes!
Here are the taskmanager logs (generated continuously for some seconds):

2020-02-15 01:17:17,208 WARN
>  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
>  - The reader is stuck in method:
>  java.lang.Object.wait(Native Method)
> org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)
> org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:856)
>
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
>
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
>
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
>
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
>
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
>
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
> 2020-02-15 01:17:17,224 INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting
> shut down.
> 2020-02-15 01:17:17,225 INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting
> shut down.


I'm using the
Flink: 1.7.2,
java: Java(TM) SE Runtime Environment (build 1.8.0_91-b14)

Any help will be appreciated.

All the best,
Soheil


Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

2020-02-14 Thread John Smith
Hi Piotr, any thoughts on this?

On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, 
wrote:

> Hi John,
>
> As you suggested, I would also lean towards increasing the number of
> allowed open handles, but
> for recommendation on best practices, I am cc'ing Piotr who may be
> more familiar with the Kafka consumer.
>
> Cheers,
> Kostas
>
> On Tue, Feb 11, 2020 at 9:43 PM John Smith  wrote:
> >
> > Just wondering is this on the client side in the flink Job? I rebooted
> the task and the job deployed correctly on another node.
> >
> > Is there a specific ulimit that we should set for flink tasks nodes?
> >
> > org.apache.kafka.common.KafkaException: Failed to construct kafka
> consumer
> > at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799)
> > at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:650)
> > at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630)
> > at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> > at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> > at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> > at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> > at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.kafka.common.KafkaException: java.io.IOException:
> Too many open files
> > at org.apache.kafka.common.network.Selector.(Selector.java:154)
> > at org.apache.kafka.common.network.Selector.(Selector.java:188)
> > at org.apache.kafka.common.network.Selector.(Selector.java:192)
> > at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:722)
> > ... 11 more
> > Caused by: java.io.IOException: Too many open files
> > at sun.nio.ch.IOUtil.makePipe(Native Method)
> > at sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:65)
> > at sun.nio.ch
> .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> > at java.nio.channels.Selector.open(Selector.java:227)
> > at org.apache.kafka.common.network.Selector.(Selector.java:152)
> > ... 14 more
>


Re: UI stability at high parallelism

2020-02-14 Thread Richard Moorhead
2020-02-14 11:50:35,402 ERROR
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler - Unhandled
exception.
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#1293527273]] after [1 ms]. Message
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A
typical reason for `AskTimeoutException` is that the recipient actor didn't
send a reply.
at
akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at
akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at
akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
at
akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:748)

On Wed, Feb 12, 2020 at 11:30 PM HuWeihua  wrote:

> Hi, Richard
>
> This is most likely that the Rest Api has timed out, you can try to find
> some evidence in the jobmanager log.
>
> You can provide the full log to help us find the root cause.
>
>
> Best
> Weihua Hu
>
> 2020年2月13日 09:40,Richard Moorhead  写道:
>
> When I submit a job to flink session with parallelism higher than 128, the
> job is submitted and renders in the UI but when I view the job itself the
> UI starts to rapidly emit errors in the upper right:
>
> Server Response:
> Unable to load requested file /bad-request.
>
> Is this a known issue? Is there a fix? Does this indicate underlying
> stability issues?
>
>
>


RE: Table API: Joining on Tables of Complex Types

2020-02-14 Thread Hailu, Andreas
Hi Timo, Dawid,

This was very helpful - thanks! The Row type seems to only support getting 
fields by their index. Is there a way to get a field by its name like the Row 
class in Spark? Link: 
https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/Row.html#getAs(java.lang.String)

Our use case is that we're developing a data-processing library for developers 
leveraging our system to refine existing datasets and produce new ones. The 
flow is as follows:

Our library reads Avro/Parquet GenericRecord data files from a source and turns 
it into a Table --> users write a series of operations on this Table to create 
a new resulting Table--> resulting Table is then transformed persisted back to 
the file system as Avro GenericRecords in Avro/Parquet file.

We can map the Row field names to their corresponding indexes by patching the 
AvroRowDeserializationSchema class, but it's the step where we handle expose 
the Table to our users and then try and persist which will end up in this 
metadata loss. We know what fields the Table must be composed of, but we just 
won't know which index they live in so Row#getField() isn't what quite what we 
need.

// ah

-Original Message-
From: Timo Walther 
Sent: Friday, January 17, 2020 11:29 AM
To: user@flink.apache.org
Subject: Re: Table API: Joining on Tables of Complex Types

Hi Andreas,

if dataset.getType() returns a RowTypeInfo you can ignore this log message. The 
type extractor runs before the ".returns()" but with this method you override 
the old type.

Regards,
Timo


On 15.01.20 15:27, Hailu, Andreas wrote:
> Dawid, this approach looks promising. I'm able to flatten out my Avro
> records into Rows and run simple queries atop of them. I've got a
> question - when I register my Rows as a table, I see the following log
> providing a warning:
>
> /2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class
> org.apache.flink.types.Row does not contain a getter for field fields/
>
> /2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class
> org.apache.flink.types.Row does not contain a setter for field fields/
>
> /2020-01-14 17:16:43,084 [main] INFO  TypeExtractor - Class class
> org.apache.flink.types.Row cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType.
> Please read the Flink documentation on "Data Types & Serialization"
> for details of the effect on performance./
>
> Will this be problematic even now that we've provided TypeInfos for
> the Rows? Performance is something that I'm concerned about as I've
> already introduced a new operation to transform our records to Rows.
>
> *// *ah**
>
> *From:* Hailu, Andreas [Engineering]
> *Sent:* Wednesday, January 8, 2020 12:08 PM
> *To:* 'Dawid Wysakowicz' ;
> mailto:user@flink.apache.org
> *Cc:* Richards, Adam S [Engineering] 
> *Subject:* RE: Table API: Joining on Tables of Complex Types
>
> Very well - I'll give this a try. Thanks, Dawid.
>
> *// *ah**
>
> *From:* Dawid Wysakowicz  >
> *Sent:* Wednesday, January 8, 2020 7:21 AM
> *To:* Hailu, Andreas [Engineering]  >; mailto:user@flink.apache.org
> 
> *Cc:* Richards, Adam S [Engineering]  >
> *Subject:* Re: Table API: Joining on Tables of Complex Types
>
> Hi Andreas,
>
> Converting your GenericRecords to Rows would definitely be the safest
> option. You can check how its done in the
> org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can
> reuse the logic from there to write something like:
>
>  DataSet dataset = ...
>
>  dataset.map( /* convert GenericRecord to Row
> */).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));
>
> Another thing you could try is to make sure that GenericRecord is seen
> as an avro type by fink (flink should understand that avro type is a
> complex type):
>
>  dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)
>
> than the TableEnvironment should pick it up as a structured type and
> flatten it automatically when registering the Table. Bear in mind the
> returns method is part of SingleInputUdfOperator so you can apply it
> right after some transformation e.g. map/flatMap etc.
>
> Best,
>
> Dawid
>
> On 06/01/2020 18:03, Hailu, Andreas wrote:
>
> Hi David, thanks for getting back.
>
>  From what you've said, I think we'll need to convert our
> GenericRecord into structured types - do you have any references or
> examples I can have a look at? If not, perhaps you could just show
> me a basic example of flattening a complex object with accessors
> into a Table of structured types. Or by structured types, did you
> mean Row?
>
> *// *ah
>
> *From:* Dawid Wysakowicz 
> 
> *Sent:* Monday, January 

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-14 Thread Salva Alcántara
Hi Piotr,

Since my current process function already works well for me, except for the
fact I don't have access to the mailbox executor, I have simply created a
custom operator for injecting that:

```
class MyOperator(myFunction: MyFunction)
  extends KeyedCoProcessOperator(myFunction)
{

  private lazy val mailboxExecutor = getContainingTask
.getMailboxExecutorFactory
.createExecutor(getOperatorConfig.getChainIndex)

  override def open(): Unit = {
super.open()
userFunction.asInstanceOf[MyFunction].mailboxExecutor = mailboxExecutor
  }
}
```

This way I can send mails just fine...in the main application I use like
this

```
.transform("wrapping my function with my operator", new MyOperator(new
MyFunction()))
```

So far everything looks good to me, but if you see problems or know a better
way, it would be great to hear your thoughts on this again. In particular,
the way of getting access to the mailbox executor is a bit clumsy...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Persisting inactive state outside Flink

2020-02-14 Thread Akshay Aggarwal
Hi,

We have a use case where we have to persist some state information about a
device forever. Each new event will fetch the keyed state and update it.
And this has to be applied in-order of events.

The problem is that the number of devices (keys) will keep growing
infinitely. Usually a device comes online, stays active for a while
(generates new events) and then goes into dormant mode. Is there a way we
can persist the state outside of Flink (say HBase) when the device goes
dormant and later fetch when it's activated?

I know we can do this in process function using timers. But here I'll have
to make a synchronous call to the external store every time a new device
comes live, or when an active device goes dormant, which will stall the
task and become a scalability bottleneck. Using AsyncIO also doesn't seem
to be an option.

Is there a way to achieve this without hacking into Flink code?

Thanks,
Akshay Aggarwal

-- 



*-*


*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*

 

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*

 

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*


_-_



Re: [ANNOUNCE] Flink Forward San Francisco 2020 Program is Live

2020-02-14 Thread Fabian Hueske
Hi everyone,

Sorry for writing another email but I forgot to mention the community
discounts.
When you register for the conference [1], you can use one of the following
discount codes:

* As a member of the Flink community we offer a 50% discount on your
conference pass if you register with the code: FFSF20-MailingList
* If you are an Apache committer (for any project), we offer a *free*
conference pass if you register with your Apache email address and the
discount code: FFSF20-ApacheCommitter

Have a nice weekend,
Fabian

[1] https://events.evolutionaryevents.com/flink-forward-sf-2020

Am Fr., 14. Feb. 2020 um 17:48 Uhr schrieb Fabian Hueske :

> Hi everyone,
>
> We announced the program of Flink Forward San Francisco 2020.
> The conference takes place at the Hyatt Regency in San Francisco from
> March 23rd to 25th.
>
> On the first day we offer four training sessions [1]:
> * Apache Flink Developer Training
> * Apache Flink Runtime & Operations Training
> * Apache Flink Tuning & Troubleshooting Training
> * Apache Flink SQL Developer Training
>
> On day two and three we have a great lineup of talks [2], including
> speakers from AWS, Bird, Cloudera, Lyft, Netflix, Splunk, Uber, Yelp,
> Alibaba, Ververica and others.
>
> Flink Forward is an excellent opportunity to learn about Flink use cases,
> recent Flink features, and best practices of running Flink applications in
> production.
> It's also a great place to connect and mingle with the Flink community.
>
> You can register for the event at
> -> https://events.evolutionaryevents.com/flink-forward-sf-2020
>
> Hope to see you there,
> Fabian
>
> [1] https://www.flink-forward.org/sf-2020/training-program
> [2] https://www.flink-forward.org/sf-2020/conference-program
>


[Flink 1.10] Classpath doesn't include custom files in lib/

2020-02-14 Thread Maxim Parkachov
Hi everyone,

I'm trying to run my job with flink 1.10 with YARN cluster per-job mode. In
the previous versions all files in lib/ folder were automatically included
in classpath. Now, with 1.10 I see only *.jar files are included in
classpath. but not "other" files. Is this deliberate change or bug ?

Generally, what is recommended way to include custom files in classpath and
ship it during start to all containers ?

Thank


[ANNOUNCE] Flink Forward San Francisco 2020 Program is Live

2020-02-14 Thread Fabian Hueske
Hi everyone,

We announced the program of Flink Forward San Francisco 2020.
The conference takes place at the Hyatt Regency in San Francisco from March
23rd to 25th.

On the first day we offer four training sessions [1]:
* Apache Flink Developer Training
* Apache Flink Runtime & Operations Training
* Apache Flink Tuning & Troubleshooting Training
* Apache Flink SQL Developer Training

On day two and three we have a great lineup of talks [2], including
speakers from AWS, Bird, Cloudera, Lyft, Netflix, Splunk, Uber, Yelp,
Alibaba, Ververica and others.

Flink Forward is an excellent opportunity to learn about Flink use cases,
recent Flink features, and best practices of running Flink applications in
production.
It's also a great place to connect and mingle with the Flink community.

You can register for the event at
-> https://events.evolutionaryevents.com/flink-forward-sf-2020

Hope to see you there,
Fabian

[1] https://www.flink-forward.org/sf-2020/training-program
[2] https://www.flink-forward.org/sf-2020/conference-program


Re:Re: Flink 1.10 es sink exception

2020-02-14 Thread sunfulin
Hi, Jark
Appreciate for your reply. insert with column list indeed is not allowed with 
old planner enabled in Flink 1.10 while it will throws exception such as 
"Partial insert is not supported". 
Never mind for this issue. Focus on the UpsertMode exception, my es DDL is like 
the following: 


CREATE TABLE ES6_ZHANGLE_OUTPUT (
  aggId varchar ,
  pageId varchar ,
  ts varchar ,
  expoCnt bigint ,
  clkCnt bigint
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://168.61.113.171:9092;http://168.61.113.171:9093',
'connector.index' = 'flink_zhangle_pageview',
'connector.document-type' = '_doc',
'update-mode' = 'upsert',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.interval' = '1000',
'format.type' = 'json'
)




And the SQL logic is as the following:


INSERT INTO ES6_ZHANGLE_OUTPUT
  SELECT aggId, pageId, ts_min as ts,
  count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
  count(case when eventId = 'click' then 1 else null end) as clkCnt
  FROM
  (
SELECT
'ZL_001' as aggId,
pageId,
eventId,
recvTime,
ts2Date(recvTime) as ts_min
from kafka_zl_etrack_event_stream
where eventId in ('exposure', 'click')
  ) as t1
  group by aggId, pageId, ts_min


I simply run StreamTableEnvironment.sqlUpdate( the above sql content) and 
execute the task. Not sure what the root cause is. 





At 2020-02-14 23:19:14, "Jark Wu"  wrote:

Hi sunfulin,


Is this the real query you submit?  AFAIK, insert with column list is not 
allowed for now, 
i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)`.


Could you attach the full SQL text, including  DDLs of ES6_ZHANGLE_OUTPUT table 
and kafka_zl_etrack_event_stream table.
If you have a minimal program that can reproduce this problem, that would be 
great. 


Best,
Jark


On Fri, 14 Feb 2020 at 22:53, Robert Metzger  wrote:




-- Forwarded message -
From: sunfulin
Date: Fri, Feb 14, 2020 at 2:59 AM
Subject: Re:Flink 1.10 es sink exception
To: user@flink.apache.org 




Anyone can share a little advice on the reason of this exception? I changed to 
use old planner, the same sql runs well. 











At 2020-02-13 16:07:18, "sunfulin"  wrote:

Hi, guys
When running the same Flink sql like the following, I met exception like 
"org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that 
Table has a full primary keys if it is updated". I am using the latest Flink 
1.10 release with blink planner enabled. Because the same logic runs well 
within Flink 1.8.2 old planner. Does the SQL usage has some problem or may has 
a bug here ? 




INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
  SELECT aggId, pageId, ts_min as ts,
  count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
  count(case when eventId = 'click' then 1 else null end) as clickCnt
  FROM
  (
SELECT
'ZL_001' as aggId,
pageId,
eventId,
recvTime,
ts2Date(recvTime) as ts_min
from kafka_zl_etrack_event_stream
where eventId in ('exposure', 'click')
  ) as t1
  group by aggId, pageId, ts_min






 





 

Re: How to fully re-aggregate a keyed windowed aggregate in the same window ?

2020-02-14 Thread Robert Metzger
Hey Arnaud,
sorry that you didn't get an answer yet. Were you able to solve your
problem in the meantime? If not, I'll find somebody to answer your question
:)

On Thu, Jan 30, 2020 at 9:18 AM LINZ, Arnaud 
wrote:

> Hello,
>
>
>
> I would like to compute statistics on a stream every hour. For that, I
> need to compute statistics on the keyed stream, then to reaggregate them.
>
> I’ve tried the following thing :
>
>
>
> stream.keyBy(mykey)
>
> .window(1 hour process time)
>
> .aggregate(my per-key aggregate)
>
>
>
> .windowAll(1 hour process time) // not the same window, add
> one hour delay…
>
>
>
> .reduce(fully aggregate intermediary results)
>
>  ... then sink
>
>
>
> This works, but I get the first line in the sink 2 hours after the first
> item in the sink, and 1 hour after it should be possible to get it.
>
>
>
> My question: How to I trigger the reduce step immediately after the first
> aggregation ?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
> --
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Re: Flink 1.10 es sink exception

2020-02-14 Thread Jark Wu
Hi sunfulin,

Is this the real query you submit?  AFAIK, insert with column list is not
allowed for now,
i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt,
clkCnt)`.

Could you attach the full SQL text, including  DDLs of ES6_ZHANGLE_OUTPUT
table and kafka_zl_etrack_event_stream table.
If you have a minimal program that can reproduce this problem, that would
be great.

Best,
Jark

On Fri, 14 Feb 2020 at 22:53, Robert Metzger  wrote:

>
>
> -- Forwarded message -
> From: sunfulin 
> Date: Fri, Feb 14, 2020 at 2:59 AM
> Subject: Re:Flink 1.10 es sink exception
> To: user@flink.apache.org 
>
>
> Anyone can share a little advice on the reason of this exception? I
> changed to use old planner, the same sql runs well.
>
>
>
>
>
> At 2020-02-13 16:07:18, "sunfulin"  wrote:
>
> Hi, guys
> When running the same Flink sql like the following, I met exception like
> "org.apache.flink.table.api.TableException: UpsertStreamTableSink requires
> that Table has a full primary keys if it is updated". I am using the latest
> Flink 1.10 release with blink planner enabled. Because the same logic runs
> well within Flink 1.8.2 old planner. Does the SQL usage has some problem or
> may has a bug here ?
>
>
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>   SELECT aggId, pageId, ts_min as ts,
>   count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>   count(case when eventId = 'click' then 1 else null end) as clickCnt
>   FROM
>   (
> SELECT
> 'ZL_001' as aggId,
> pageId,
> eventId,
> recvTime,
> ts2Date(recvTime) as ts_min
> from kafka_zl_etrack_event_stream
> where eventId in ('exposure', 'click')
>   ) as t1
>   group by aggId, pageId, ts_min
>
>
>
>
>
>
>
>
>


Re: Re: Flink connect hive with hadoop HA

2020-02-14 Thread Robert Metzger
There's a configuration value "env.hadoop.conf.dir" to set the hadoop
configuration directory:
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#env-hadoop-conf-dir
If the files in that directory correctly configure Hadoop HA, the client
side should pick up the config.

On Tue, Feb 11, 2020 at 3:39 AM sunfulin  wrote:

> Hi ,guys
> Thanks for kind reply. Actually I want to know how to change client side
> haddop conf while using table API within my program. Hope some useful sug.
>
>
>
>
>
> At 2020-02-11 02:42:31, "Bowen Li"  wrote:
>
> Hi sunfulin,
>
> Sounds like you didn't config the hadoop HA correctly on the client side
> according to [1]. Let us know if it helps resolve the issue.
>
> [1]
> https://stackoverflow.com/questions/25062788/namenode-ha-unknownhostexception-nameservice1
>
>
>
>
> On Mon, Feb 10, 2020 at 7:11 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi,
>>
>> Could you please provide a full stacktrace?
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Feb 10, 2020 at 2:12 PM sunfulin  wrote:
>>
>>> Hi, guys
>>> I am using Flink 1.10 and test functional cases with hive intergration.
>>> Hive with 1.1.0-cdh5.3.0 and with hadoop HA enabled.Running flink job I can
>>> see successful connection with hive metastore, but cannot read table data
>>> with exception:
>>>
>>> java.lang.IllegalArgumentException: java.net.UnknownHostException:
>>> nameservice1
>>> at
>>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
>>> at
>>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>>> at
>>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
>>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
>>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
>>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>>>
>>> I am running a standalone application. Looks like I am missing my hadoop
>>> conf file in my flink job application classpath. Where should I config ?
>>>
>>>
>>>
>>>
>>
>
>
>


Re: 1.9 timestamp type default

2020-02-14 Thread Timo Walther

Hi,

the type system is still under heavy refactoring that touches a lot of 
interfaces. Where would you like to use java.sql.Timestamp? UDFs are not 
well supported right now. Source and sinks might work for the Blink 
planner and java.sql.Timestamp is the only supported conversion class of 
old planner. Usage is as Rui explained.


Regards,
Timo


On 14.02.20 02:37, Rui Li wrote:

Hi,

I don't think there's a config to change the default behavior. But you 
can change the bridged class programmatically like: 
DataTypes.TIMESTAMP(9).bridgedTo(java.sql.Timestamp.class)


On Fri, Feb 14, 2020 at 8:47 AM Fanbin Bu > wrote:


Hi,

According to

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/types.html#timestamp,
the default java bridge time for timestamp is
java.time.LocalDateTime. Is there a setting that can change it to use
java.sql.Timestamp instead?

Thanks,
Fanbin



--
Best regards!
Rui Li




【checkpoint】请问一下flink 支持把checkpoint 输出到外部的hdfs 吗?

2020-02-14 Thread tao wang



[Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

2020-02-14 Thread Niels Basjes
Hi,

I have test code (
https://github.com/nielsbasjes/yauaa/blob/v5.15/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperInline.java#L140
)
that writes a DataStream to a List<> using LocalCollectionOutputFormat to
verify if the pipeline did what it should do.

List result = new ArrayList<>(5);
testRecordDataSet
 .writeUsingOutputFormat(new LocalCollectionOutputFormat<>(result));
environment.execute();
assertEquals(2, result.size());


I was just now upgrading to Flink 1.10 and I found that
apparently writeUsingOutputFormat has now been deprecated.
The comment says (
https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1066
 ):

* @deprecated Please use the {@link
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}
explicitly using the
* {@link #addSink(SinkFunction)} method.

I'm not writing to a file at all. Looking at the API this
StreamingFileSink does not seem to fit what I'm doing.

What is in Flink 1.10 the correct way of writing a test to verify if the
output of my test run is valid?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: ParquetTableSource在blink table planner下的使用问题

2020-02-14 Thread jun su
1. 发现ParquetTableSource在flink table planner下, stream/batch 两个模式下都有以上提出的问题,
2. blink table planner下没有以上问题, 但是中文print方法有编码问题

不清数是不是我使用问题,麻烦查证下

jun su  于2020年2月14日周五 下午6:30写道:

> hi Jark Wu,
>
> 抱歉以下是我的代码和结果:
>
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment fbEnv = 
> ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
>
> String schema = 
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"parent_process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"dst_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"technique_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tgt_path\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tgt_pid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"windows_event_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"event_rule_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dev_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"sa_da\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tran_protocol\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"src_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"domain_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"operation_object\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"protocol\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"vendor\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"src_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"source_user\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"image\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"parent_command_line\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"product\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"sa_sp_ap_da_dp\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rule_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"receive_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"collector_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"source_process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"src_ad\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rule_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"src_port\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"event_content\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file_hash\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"source_image\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dst_port\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"event_level\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"parent_image\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"current_directory\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"technique_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"user_account\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"command_line\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"host_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}";
>
> ParquetTableSource parquetTableSource = ParquetTableSource
> .builder()
> .path("/Users/sujun/Downloads/edr/EDR")
> .forParquetSchema(new 
> AvroSchemaConverter().convert(org.apache.avro.Schema.parse(schema, true)))
> .build();
>
>
> Table source = fbTableEnv.fromTableSource(parquetTableSource);
> fbTableEnv.createTemporaryView("source",source);
>
> Table table = fbTableEnv.sqlQuery("select event_name from source where 
> event_name = '没有这个值'");
>
> fbTableEnv.toDataSet(table,Row.class).print();
>
> 没有这个值
> 没有这个值
> 没有这个值
> 没有这个值
> 没有这个值
> 没有这个值
> 没有这个值
>
>
> Jark Wu  于2020年2月14日周五 下午6:25写道:
>
>> Hi Jun,
>>
>> 你上传的图片失败了,你可以选择用一些图床工具上传然后将链接贴在这里。或者直接贴文本。
>>
>> Best,
>> Jark
>>
>> On Fri, 14 Feb 2020 at 18:16, jun su  wrote:
>>
>> > hi JingsongLee,
>> > 我在测试ParquetTableSource时遇到一个问题:  我的数据中没有where条件设置的值, 但是打印的结果,
>> > 

Re: ParquetTableSource在blink table planner下的使用问题

2020-02-14 Thread Jark Wu
Hi Jun,

你上传的图片失败了,你可以选择用一些图床工具上传然后将链接贴在这里。或者直接贴文本。

Best,
Jark

On Fri, 14 Feb 2020 at 18:16, jun su  wrote:

> hi JingsongLee,
> 我在测试ParquetTableSource时遇到一个问题:  我的数据中没有where条件设置的值, 但是打印的结果,
> 是将where条件直接赋值给了该字段
>
> [image: image.png]
>
> JingsongLee  于2020年2月14日周五 下午5:05写道:
>
>> Hi jun,
>>
>> pushdown逻辑是批流复用的,应该work的很愉快。
>>
>> Best,
>> Jingsong Lee
>>
>>
>> --
>> From:jun su 
>> Send Time:2020年2月14日(星期五) 17:00
>> To:user-zh 
>> Subject:ParquetTableSource在blink table planner下的使用问题
>>
>> 你好:
>>官网文档中说明Blink Table Planner并不支持BatchTableSource,
>>
>> 目前最新版1.10代码中的ParquetTableSource还是BatchTableSource,那么说明目前的ParquetTableSource还不支持blink
>> table planner ?如果将现有的ParquetTableSource改成StreamTableSource后,
>> pushdown逻辑会不会出现bug?
>>
>


Re: ParquetTableSource在blink table planner下的使用问题

2020-02-14 Thread jun su
hi JingsongLee,
我在测试ParquetTableSource时遇到一个问题:  我的数据中没有where条件设置的值, 但是打印的结果,
是将where条件直接赋值给了该字段

[image: image.png]

JingsongLee  于2020年2月14日周五 下午5:05写道:

> Hi jun,
>
> pushdown逻辑是批流复用的,应该work的很愉快。
>
> Best,
> Jingsong Lee
>
>
> --
> From:jun su 
> Send Time:2020年2月14日(星期五) 17:00
> To:user-zh 
> Subject:ParquetTableSource在blink table planner下的使用问题
>
> 你好:
>官网文档中说明Blink Table Planner并不支持BatchTableSource,
>
> 目前最新版1.10代码中的ParquetTableSource还是BatchTableSource,那么说明目前的ParquetTableSource还不支持blink
> table planner ?如果将现有的ParquetTableSource改成StreamTableSource后,
> pushdown逻辑会不会出现bug?
>


Re: Batch reading from Cassandra. How to?

2020-02-14 Thread Lasse Nedergaard
Any good suggestions?

Lasse

Den tir. 11. feb. 2020 kl. 08.48 skrev Lasse Nedergaard <
lassenederga...@gmail.com>:

> Hi.
>
> We would like to do some batch analytics on our data set stored in
> Cassandra and are looking for an efficient way to load data from a single
> table. Not by key, but random 15%, 50% or 100%
> Data bricks has create an efficient way to load Cassandra data into Apache
> Spark and they are doing it by reading from the underlying SS tables to
> load in parallel.
> Do we have something similarly in Flink, or how is the most efficient way
> to load all, or many random data from a single Cassandra table into Flink?
>
> Any suggestions and/or recommendations is highly appreciated.
>
> Thanks in advance
>
> Lasse Nedergaard
>


Re:Re: image

2020-02-14 Thread 潘明文


select * from mykafka1

错误信息:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.(Lorg/apache/kafka/common/Metric;)V









在 2020-02-14 17:43:16,"Benchao Li"  写道:
>明文,
>
>你的图片没有在邮件中显示出来。
>你可以用第三方的图床来上传图片,或者直接以text的方式发送你的异常信息。
>
>
>潘明文  于2020年2月14日周五 下午5:31写道:
>
>>
>>
>> CREATE TABLE mykafka1(name String) WITH (
>>'connector.type' = 'kafka',
>>'connector.version' = 'universal',
>>'connector.topic' = 'mysql_binlog',
>>'connector.properties.zookeeper.connect' = 'masternode1:2181',
>>'connector.properties.bootstrap.servers' = 'masternode1:9092',
>>'format.type' = 'csv',
>>'update-mode' = 'append'
>> );
>>
>>
>> 出现下面错误,没有方法,是版本不对吗?我用的是flink-sql-connector-kafka_2.11-1.10.0.jar 
>> flink-connector-kafka_2.11-1.10.0.jar
>>
>>
>>
>>
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


image

2020-02-14 Thread 潘明文




CREATE TABLE mykafka1(name String) WITH (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'mysql_binlog',
   'connector.properties.zookeeper.connect' = 'masternode1:2181',
   'connector.properties.bootstrap.servers' = 'masternode1:9092',
   'format.type' = 'csv',
   'update-mode' = 'append'
);



出现下面错误,没有方法,是版本不对吗?我用的是flink-sql-connector-kafka_2.11-1.10.0.jar 
flink-connector-kafka_2.11-1.10.0.jar



ParquetTableSource在blink table planner下的使用问题

2020-02-14 Thread jun su
你好:
   官网文档中说明Blink Table Planner并不支持BatchTableSource,
目前最新版1.10代码中的ParquetTableSource还是BatchTableSource,那么说明目前的ParquetTableSource还不支持blink
table planner ?如果将现有的ParquetTableSource改成StreamTableSource后,
pushdown逻辑会不会出现bug?


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-14 Thread Till Rohrmann
Congratulations to everyone and a big thanks to our release managers!

Cheers,
Till

On Thu, Feb 13, 2020 at 2:41 PM Oytun Tez  wrote:

> 
>
> On Thu, Feb 13, 2020 at 2:26 AM godfrey he  wrote:
>
>> Congrats to everyone involved! Thanks, Yu & Gary.
>>
>> Best,
>> godfrey
>>
>> Yu Li  于2020年2月13日周四 下午12:57写道:
>>
>>> Hi Kristoff,
>>>
>>> Thanks for the question.
>>>
>>> About Java 11 support, please allow me to quote from our release note
>>> [1]:
>>>
>>> Lastly, note that the connectors for Cassandra, Hive, HBase, and Kafka
>>> 0.8–0.11
>>> have not been tested with Java 11 because the respective projects did
>>> not provide
>>> Java 11 support at the time of the Flink 1.10.0 release
>>>
>>> Which is the main reason for us to still make our docker image based on
>>> JDK 8.
>>>
>>> Hope this answers your question.
>>>
>>> Best Regards,
>>> Yu
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html
>>>
>>>
>>> On Wed, 12 Feb 2020 at 23:43, KristoffSC 
>>> wrote:
>>>
 Hi all,
 I have a small question regarding 1.10

 Correct me if I'm wrong, but 1.10 should support Java 11 right?

 If so, then I noticed that docker images [1] referenced in [2] are still
 based on openjdk8 not Java 11.

 Whats up with that?

 P.S.
 Congrats on releasing 1.10 ;)

 [1]

 https://github.com/apache/flink/blob/release-1.10/flink-container/docker/Dockerfile
 [2]

 https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html



 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>> --
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>