Re: Job Manager becomes irresponsive if the size of the session cluster grows

2019-07-17 Thread Biao Liu
Hi,

It seems to be good based on your GC metrics. You could double check the GC
log if you enable it. The GC log is more direct.
I'm not sure what's happening in your JobManager. But I'm pretty sure that
Flink could support far more larger scale cluster than yours.

Have you ever checked the log file of JobManager? Is there any suspicious
warning or error log?
Have you ever tried some analytic tools to check the internal state of
JobManager, like jstack.

It's hard to do a deeper analysis based on current informations. It might
be helpful if you could provide more details.


Prakhar Mathur  于2019年7月18日周四 下午2:12写道:

> Hi,
>
> We are using v1.6.2, currently, the number of TaskManagers are 70. We have
> the GC metrics on a dashboard also. Sum of
> Status.JVM.GarbageCollector.MarkSweepCompact.Time grouped by 1 min is
> somewhere between 75 to 125
> and Status.JVM.GarbageCollector.MarkSweepCompact.Count is fixed at 10.
>
> On Thu, Jul 18, 2019 at 11:32 AM Biao Liu  wrote:
>
>> Hi Prakhar,
>>
>> Have you ever checked the garbage collection of master?
>> Which version of Flink are you using? How many TaskManagers in your
>> cluster?
>>
>>
>> Prakhar Mathur  于2019年7月18日周四 下午1:54写道:
>>
>>> Hello,
>>>
>>> We have deployed multiple Flink clusters on Kubernetess with 1 replica
>>> of Jobmanager and multiple of Taskmanager as per the requirement. Recently
>>> we are observing that on increasing the number of Taskmanagers for a
>>> cluster, the Jobmanager becomes irresponsive. It stops sending statsd
>>> metric for some irregular interval. Even the Jobmanager pod keeps
>>> restarting because it stops responding to the liveliness probe which
>>> results in Kubernetes killing the pod. We tried increasing the resources
>>> given(CPU, RAM) but it didn't help.
>>>
>>> Regards
>>> Prakhar Mathur
>>> Product Engineer
>>> GO-JEK
>>>
>>


Re: Checkpoints timing out for no apparent reason

2019-07-17 Thread Congxian Qiu
Hi

The image did not show. incremental checkpoint includes: 1) flush memtable
to sst files; 2) checkpoint of RocksDB; 3) snapshot metadata; 4) upload
needed sst files to remote, all the first three steps are in sync part, and
the fourth step in async part, could you please check whether the sync or
async part takes too long time. As the sync part, maybe you could
checkpoint the disk performance during checkpoint, as the async part, maybe
you should checkpoint the network performance and the s3 client.

Best,
Congxian


spoganshev  于2019年7月17日周三 上午4:02写道:

> We have an issue with a job when it occasionally times out while creating
> snapshots for no apparent reason:
>
> 
>
> Details:
> - Flink 1.7.2
> - Checkpoints are saved to S3 with presto
> - Incremental checkpoints are used
>
> What might be the cause of this issue? It feels like some internal s3
> client
> timeout issue, but I didn't find any configuration of such timeout.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Job Manager becomes irresponsive if the size of the session cluster grows

2019-07-17 Thread Biao Liu
Hi Prakhar,

Have you ever checked the garbage collection of master?
Which version of Flink are you using? How many TaskManagers in your
cluster?


Prakhar Mathur  于2019年7月18日周四 下午1:54写道:

> Hello,
>
> We have deployed multiple Flink clusters on Kubernetess with 1 replica of
> Jobmanager and multiple of Taskmanager as per the requirement. Recently we
> are observing that on increasing the number of Taskmanagers for a cluster,
> the Jobmanager becomes irresponsive. It stops sending statsd metric for
> some irregular interval. Even the Jobmanager pod keeps restarting because
> it stops responding to the liveliness probe which results in Kubernetes
> killing the pod. We tried increasing the resources given(CPU, RAM) but it
> didn't help.
>
> Regards
> Prakhar Mathur
> Product Engineer
> GO-JEK
>


Re: yarn-session vs cluster per job for streaming jobs

2019-07-17 Thread Maxim Parkachov
Hi Haibo,

thanks for tip, I almost forgot about max-attempts. I understood
implication of running with one AM.

Maybe my question was incorrect, but what would be faster (with regards to
downtime of each job):

1. In case of yarn-session: Parallel cancel all jobs with savepoints,
restart yarn-session, parallel start all jobs from savepoints
2. In case of per-job mode Parallel cancel all jobs with savepoints,
parallel start all jobs from savepoints.

I want to optimise standard situation where I deploy new version of all my
jobs. My current impression that job starts faster in yarn-session mode.

Thanks,
Maxim.


On Thu, Jul 18, 2019 at 4:57 AM Haibo Sun  wrote:

> Hi, Maxim
>
> For the concern talking on the first point:
> If HA and checkpointing are enabled, AM (the application master, that is
> the job manager you said) will be restarted by YARN after it dies, and then
> the dispatcher will try to restore all the previously running jobs
> correctly. Note that the number of attempts be decided by the
> configurations "yarn.resourcemanager.am.max-attempts" and
> "yarn.application-attempts". The obvious difference between the session and
> per-job modes is that if a fatal error occurs on AM, it will affect all
> jobs running in it, while the per-job mode will only affect one job.
>
> You can look at this document to see how to configure HA for the Flink
> cluster on YARN:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#yarn-cluster-high-availability
>  .
>
> Best,
> Haibo
>
> At 2019-07-17 23:53:15, "Maxim Parkachov"  wrote:
>
> Hi,
>
> I'm looking for advice on how to run flink streaming jobs on Yarn cluster
> in production environment. I tried in testing environment both approaches
> with HA mode, namely yarn session + multiple jobs vs cluster per job, both
> seems to work for my cases, with slight preference of yarn session mode to
> centrally manage credentials. I'm looking to run about 10 streaming jobs
> mostly reading/writing from kafka + cassandra with following restictions:
> 1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I
> have a concern here what happens when Job manager dies in session mode.
> 2. there are often network interruptions/slowdowns.
> 3. I'm trying to minimise time to restart job to have as much as possible
> continious processing.
>
> Thanks in advance,
> Maxim.
>
>


Job Manager becomes irresponsive if the size of the session cluster grows

2019-07-17 Thread Prakhar Mathur
Hello,

We have deployed multiple Flink clusters on Kubernetess with 1 replica of
Jobmanager and multiple of Taskmanager as per the requirement. Recently we
are observing that on increasing the number of Taskmanagers for a cluster,
the Jobmanager becomes irresponsive. It stops sending statsd metric for
some irregular interval. Even the Jobmanager pod keeps restarting because
it stops responding to the liveliness probe which results in Kubernetes
killing the pod. We tried increasing the resources given(CPU, RAM) but it
didn't help.

Regards
Prakhar Mathur
Product Engineer
GO-JEK


Re: Cannot access the data from Hive-Tables in Blink

2019-07-17 Thread Bowen Li
Hi Yebgenya,

This is caused by Hive version mismatch, you are either not using the right
Hive version (double check your Hive version is supported by Blink), or not
specifying the right version in yaml config (e.g. you use 2.3.4 but specify
it as 1.2.1).

Bowen

On Tue, Jul 16, 2019 at 11:22 AM Yebgenya Lazarkhosrouabadi <
lazarkhosrouab...@integration-factory.de> wrote:

> Hello,
>
>
>
> I’m trying to use BigBench queries on Blink in Cloudera. I have defined a
> catalog in YAML-file and can see my Hive-tables in SQL-client.
>
> But I can’t see the data of the tables, or run any other SQL-Query. I get
> this error:
>
>
>
> [ERROR] Could not execute SQL statement. Reason:
>
> org.apache.flink.hive.shaded.org.apache.thrift.TApplicationException:
> Invalid method name: 'get_table_req'
>
>
>
>
>
> How can I get rid of this error?
>
>
>
> Regards
>
> Yebgenya
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>


Re:Re: Job leak in attached mode (batch scenario)

2019-07-17 Thread Haibo Sun


There should be no JIRA about the requirement. If you have a strong need for 
this feature, you can create one. In addition, you can also go to 
issues.apache.org and search with keywords to confirm whether there are the 
relevant JIRA.


Best,
Haibo

At 2019-07-18 10:31:22, "qi luo"  wrote:
Thanks Haibo for the response!


Is there any community issue or plan to implement heartbeat mechanism between 
Dispatcher and Client? If not, should I create one?


Regards,
Qi



On Jul 17, 2019, at 10:19 AM, Haibo Sun  wrote:


Hi, Qi


As far as I know, there is no such mechanism now. To achieve this, I think it 
may be necessary to add a REST-based heartbeat mechanism between Dispatcher and 
Client. At present, perhaps you can add a monitoring service to deal with these 
residual Flink clusters.


Best,
Haibo

At 2019-07-16 14:42:37, "qi luo"  wrote:
Hi guys,


We runs thousands of Flink batch job everyday. The batch jobs are submitted in 
attached mode, so we can know from the client when the job finished and then 
take further actions. To respond to user abort actions, we submit the jobs with 
"—shutdownOnAttachedExit” so the Flink cluster can be shutdown when the client 
exits.


However, in some cases when the Flink client exists abnormally (such as OOM), 
the shutdown signal will not be sent to Flink cluster, causing the “job leak”. 
The lingering Flink job will continue to run and never ends, consuming large 
amount of resources and even produce unexpected results.


Does Flink has any mechanism to handle such scenario (e.g. Spark has cluster 
mode, where the driver runs in the client side, so the job will exit when 
client exits)? Any idea will be very appreciated!


Thanks,
Qi



Re:yarn-session vs cluster per job for streaming jobs

2019-07-17 Thread Haibo Sun
Hi, Maxim


For the concern talking on the first point: 
If HA and checkpointing are enabled, AM (the application master, that is the 
job manager you said) will be restarted by YARN after it dies, and then the 
dispatcher will try to restore all the previously running jobs correctly. Note 
that the number of attempts be decided by the configurations 
"yarn.resourcemanager.am.max-attempts" and "yarn.application-attempts". The 
obvious difference between the session and per-job modes is that if a fatal 
error occurs on AM, it will affect all jobs running in it, while the per-job 
mode will only affect one job.



You can look at this document to see how to configure HA for the Flink cluster 
on YARN: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#yarn-cluster-high-availability
 .


Best,
Haibo


At 2019-07-17 23:53:15, "Maxim Parkachov"  wrote:

Hi,


I'm looking for advice on how to run flink streaming jobs on Yarn cluster in 
production environment. I tried in testing environment both approaches with HA 
mode, namely yarn session + multiple jobs vs cluster per job, both seems to 
work for my cases, with slight preference of yarn session mode to centrally 
manage credentials. I'm looking to run about 10 streaming jobs mostly 
reading/writing from kafka + cassandra with following restictions:
1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I have 
a concern here what happens when Job manager dies in session mode.

2. there are often network interruptions/slowdowns.
3. I'm trying to minimise time to restart job to have as much as possible 
continious processing.


Thanks in advance,
Maxim.


Re: [DISCUSS] Create a Flink ecosystem website

2019-07-17 Thread Congxian Qiu
Robert and Daryl, thanks for the great work, I tried the website and filed
some issues on Github.
Best,
Congxian


Robert Metzger  于2019年7月17日周三 下午11:28写道:

> Hey all,
>
> Daryl and I have great news to share. We are about to finish adding the
> basic features to the ecosystem page.
> We are at a stage where it is ready to be reviewed and made public.
>
> You can either check out a development instance of the ecosystem page
> here: https://flink-ecosystem-demo.flink-resources.org/
> Or you run it locally, with the instructions from the README.md:
> https://github.com/sorahn/flink-ecosystem
>
> Please report all issues you find here:
> https://github.com/sorahn/flink-ecosystem/issues or in this thread.
>
> The next steps in this project are the following:
> - We fix all issues reported through this testing
> - We set up the site on the INFRA resources Becket has secured [1], do
> some further testing (including email notifications) and pre-fill the page
> with some packages.
> - We set up a packages.flink.apache.org or flink.apache.org/packages
> domain
> - We announce the packages through a short blog post
>
> Happy testing!
>
> Best,
> Robert
>
> [1] https://issues.apache.org/jira/browse/INFRA-18010
>
>
> On Thu, Apr 25, 2019 at 6:23 AM Becket Qin  wrote:
>
>> Thanks for the update, Robert. Looking forward to the website. If there
>> is already a list of software we need to run the website, we can ask Apache
>> infra team to prepare the VM for us, as that may also take some time.
>>
>> On Wed, Apr 24, 2019 at 11:57 PM Robert Metzger 
>> wrote:
>>
>>> Hey all,
>>>
>>> quick update on this project: The frontend and backend code have been
>>> put together into this repository:
>>> https://github.com/sorahn/flink-ecosystem
>>> We also just agreed on an API specification, and will now work on
>>> finishing the backend.
>>>
>>> It will probably take a few more weeks for this to finish, but we are
>>> making progress :)
>>>
>>> Best,
>>> Robert
>>>
>>>
>>> On Mon, Apr 15, 2019 at 11:18 AM Robert Metzger 
>>> wrote:
>>>
 Hey Daryl,

 thanks a lot for posting a link to this first prototype on the mailing
 list! I really like it!

 Becket: Our plan forward is that Congxian is implementing the backend
 for the website. He has already started with the work, but needs at least
 one more week.


 [Re-sending this email because the first one was blocked on dev@f.a.o]


 On Mon, Apr 15, 2019 at 7:59 AM Becket Qin 
 wrote:

> Hi Daryl,
>
> Thanks a lot for the update. The site looks awesome! This is a great
> progress. I really like the conciseness of GUI.
>
> One minor suggestion is that for the same library, there might be
> multiple versions compatible with different Flink versions. It would be
> good to show that somewhere in the project page as it seems important to
> the users.
>
> BTW, will you share the plan to move forward? Would additional hands
> help?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Apr 13, 2019 at 7:10 PM Daryl Roberts 
> wrote:
>
>> > Shall we add a guide page to show people how to publish their
>> projects to the website? The exact rules can be discussed and drafted in 
>> a
>> separate email thread IMO
>>
>> This is a good idea. (Both the guise, and separate thread), I think
>> once there is an actual packed in place we’ll be in a lot better position
>> to discuss this.
>>
>> > The "Log in with Github" link doesn't seem to work yet. Will it
>> only allow login for admins and publishers, or for everyone?
>>
>> Correct, all the oauth stuff requires a real server. We are currently
>> just faking everything.
>>
>> I will add a mock-login page (username/password that just accepts
>> anything and displays whatever username you type in) so we can see the
>> add-comment field and add-packages page once they exist.
>>
>>
>>
>>


ValueState in SessionWindows

2019-07-17 Thread 艾毅
  I implemented a class, the class extends MergingWindowAssigner, In the
trigger, I use ValueState, when windows is merged,  I want to get
ValueState in merged windows. now I can't get the
ValueState.


[no subject]

2019-07-17 Thread tangkailin
Hello,
I am trying to use HashMap In my window function of flink job. if the 
parallelism change, is this hashmap still a singleton? Shouldn’t  I do 
something similar here?


发送自 Windows 10 版邮件应用



Re: Job leak in attached mode (batch scenario)

2019-07-17 Thread qi luo
Thanks Haibo for the response!

Is there any community issue or plan to implement heartbeat mechanism between 
Dispatcher and Client? If not, should I create one?

Regards,
Qi

> On Jul 17, 2019, at 10:19 AM, Haibo Sun  wrote:
> 
> Hi, Qi
> 
> As far as I know, there is no such mechanism now. To achieve this, I think it 
> may be necessary to add a REST-based heartbeat mechanism between Dispatcher 
> and Client. At present, perhaps you can add a monitoring service to deal with 
> these residual Flink clusters.
> 
> Best,
> Haibo
> 
> At 2019-07-16 14:42:37, "qi luo"  wrote:
> Hi guys,
> 
> We runs thousands of Flink batch job everyday. The batch jobs are submitted 
> in attached mode, so we can know from the client when the job finished and 
> then take further actions. To respond to user abort actions, we submit the 
> jobs with "—shutdownOnAttachedExit” so the Flink cluster can be shutdown when 
> the client exits.
> 
> However, in some cases when the Flink client exists abnormally (such as OOM), 
> the shutdown signal will not be sent to Flink cluster, causing the “job 
> leak”. The lingering Flink job will continue to run and never ends, consuming 
> large amount of resources and even produce unexpected results.
> 
> Does Flink has any mechanism to handle such scenario (e.g. Spark has cluster 
> mode, where the driver runs in the client side, so the job will exit when 
> client exits)? Any idea will be very appreciated!
> 
> Thanks,
> Qi



Re: table toRetractStream missing last record and adding extra column (True)

2019-07-17 Thread sri hari kali charan Tummala
yes even the delimiter can be replaced, have to test what happens if the
data itself has a comma in it I need to test.

table.toRetractStream(TypeInformation.of(classOf[Row]))
  .map(_._2.toString.replaceAll(",","~"))
  
.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",
FileSystem.WriteMode.OVERWRITE)


On Wed, Jul 17, 2019 at 6:47 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Amazing all issues resolved in one go thanks Cheng , one issue though I
> can't write map.(_._2) to CSV looks like it doesn't support right now have
> to be TextFile.
>
> below is a full code if someone wants in Scala.
>
> Git Code is here:-
> https://github.com/kali786516/FlinkStreamAndSql
>
> package com.aws.examples.kinesis.consumer.transactionExampleScala
>
> import java.util.Properties
> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> import 
> com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass
> import com.google.gson.Gson
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.types.Row
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
> import 
> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, 
> ConsumerConfigConstants}
> import org.apache.flink.api.scala._
> import org.apache.flink.table.api.scala._
> import java.sql.{DriverManager, Time}
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.core.fs.{FileSystem, Path}
>
> object TransactionScalaTest {
>
>   /*
>   extends RetractStreamTableSink[Row]
>   override def configure(strings: Array[String], typeInformations: 
> Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???
>
>   override def getFieldNames: Array[String] = ???
>
>   override def getFieldTypes: Array[TypeInformation[_]] = ???
>
>   override def emitDataStream(dataStream: 
> DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???
>
>   override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] 
> = super.getOutputType
>
>   override def getRecordType: TypeInformation[Row] = ???
>
>*/
>
>   def main(args: Array[String]): Unit = {
>
>
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> //env.enableCheckpointing(1)
>
> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new 
> FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(), 
> consumerConfig))
>
> val mapFunction: MapFunction[String, (String, String, String, String, 
> String, String, String, String, String, String)] =
>   new MapFunction[String, (String, String, String, String, String, 
> String, String, String, String, String)]() {
>
> override def map(s: String): (String, String, String, String, String, 
> String, String, String, String, String) = {
>
>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>   val csvData = data.getCc_num + "," +
> data.getFirst + "," +
> data.getLast + "," +
> data.getTrans_num + "," +
> data.getTrans_time + "," +
> data.getCategory + "," +
> data.getMerchant + "," +
> data.getAmt + "," +
> data.getMerch_lat + "," +
> data.getMerch_long
>
>   //println(csvData)
>
>   val p: Array[String] = csvData.split(",")
>   var cc_num: String = p(0)
>   var first: String = p(1)
>   var last: String = p(2)
>   var trans_num: String = p(3)
>   var trans_time: String = p(4)
>   var category: String = p(5)
>   var merchant: String = p(6)
>   var amt: String = p(7)
>   var merch_lat: String = p(8)
>   var merch_long: String = p(9)
>
>   val creationDate:Time = new Time(System.currentTimeMillis())
>   return (cc_num, first, last, trans_num, trans_time, category, 
> merchant, amt, merch_lat, merch_long)
> }
>   }
>
>

Re: table toRetractStream missing last record and adding extra column (True)

2019-07-17 Thread sri hari kali charan Tummala
Amazing all issues resolved in one go thanks Cheng , one issue though I
can't write map.(_._2) to CSV looks like it doesn't support right now have
to be TextFile.

below is a full code if someone wants in Scala.

Git Code is here:-
https://github.com/kali786516/FlinkStreamAndSql

package com.aws.examples.kinesis.consumer.transactionExampleScala

import java.util.Properties
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass
import com.google.gson.Gson
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants,
ConsumerConfigConstants}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import java.sql.{DriverManager, Time}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.{FileSystem, Path}

object TransactionScalaTest {

  /*
  extends RetractStreamTableSink[Row]
  override def configure(strings: Array[String], typeInformations:
Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]]
= ???

  override def getFieldNames: Array[String] = ???

  override def getFieldTypes: Array[TypeInformation[_]] = ???

  override def emitDataStream(dataStream:
DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???

  override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean,
Row]] = super.getOutputType

  override def getRecordType: TypeInformation[Row] = ???

   */

  def main(args: Array[String]): Unit = {



// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
//env.enableCheckpointing(1)

val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(),
consumerConfig))

val mapFunction: MapFunction[String, (String, String, String,
String, String, String, String, String, String, String)] =
  new MapFunction[String, (String, String, String, String, String,
String, String, String, String, String)]() {

override def map(s: String): (String, String, String, String,
String, String, String, String, String, String) = {

  val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

  val csvData = data.getCc_num + "," +
data.getFirst + "," +
data.getLast + "," +
data.getTrans_num + "," +
data.getTrans_time + "," +
data.getCategory + "," +
data.getMerchant + "," +
data.getAmt + "," +
data.getMerch_lat + "," +
data.getMerch_long

  //println(csvData)

  val p: Array[String] = csvData.split(",")
  var cc_num: String = p(0)
  var first: String = p(1)
  var last: String = p(2)
  var trans_num: String = p(3)
  var trans_time: String = p(4)
  var category: String = p(5)
  var merchant: String = p(6)
  var amt: String = p(7)
  var merch_lat: String = p(8)
  var merch_long: String = p(9)

  val creationDate:Time = new Time(System.currentTimeMillis())
  return (cc_num, first, last, trans_num, trans_time,
category, merchant, amt, merch_lat, merch_long)
}
  }


val data = kinesis.map(mapFunction)

tEnv.registerDataStream("transactions", data,
'cc_num,'first_column,'last_column,'trans_num,
  
'trans_time,'category_column,'merchant_column,'amt_column,'merch_lat,'merch_long)
//tEnv.registerDataStream("transactions", data,
"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num') "
val table = tEnv.sqlQuery(query)

table
  .toRetractStream(Typ

Flink Zookeeper HA: FileNotFoundException blob - Jobmanager not starting up

2019-07-17 Thread Richard Deurwaarder
Hello,

I've got a problem with our flink cluster where the jobmanager is not
starting up anymore, because it tries to download non existant (blob) file
from the zookeeper storage dir.

We're running flink 1.8.0 on a kubernetes cluster and use the google
storage connector [1] to store checkpoints, savepoints and zookeeper data.

When I noticed the jobmanager was having problems, it was in a crashloop
throwing file not found exceptions [2]
Caused by: java.io.FileNotFoundException: Item not found:
some-project-flink-state/recovery/hunch/blob/job_e6ad857af7f09b56594e95fe273e9eff/blob_p-486d68fa98fa05665f341d79302c40566b81034e-306d493f5aa810b5f4f7d8d63f5b18b5.
If you enabled STRICT generation consistency, it is possible that the live
version is still available but the intended generation is deleted.

I looked in the blob directory and I can only find:
/recovery/hunch/blob/job_1dccee15d84e1d2cededf89758ac2482 I've tried to
fiddle around in zookeeper to see if I could find anything [3], but I do
not really know what to look for.

How could this have happened and how should I recover the job from this
situation?

Thanks,

Richard

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/connectors.html#using-hadoop-file-system-implementations
[2] https://gist.github.com/Xeli/0321031655e47006f00d38fc4bc08e16
[3] https://gist.github.com/Xeli/04f6d861c5478071521ac6d2c582832a


Re: Providing external files to flink classpath

2019-07-17 Thread Vishwas Siravara
Does the -yt option work for standalone cluster without dedicated resource
manager ? So this property file is read by one of the dependencies inside
my application as a file, so I can't really use Parameter tool to parse the
config file.

Thanks,
Vishwas

On Fri, Jun 28, 2019 at 11:08 PM Yun Tang  wrote:

> Hi Vishwas
>
>
>1. You could use '-yt' to ship specified files to the class path,
>please refer to [1] for more details.
>2. If the properties are only loaded on client side before executing
>the application, you could let your application to just read from local
>property data. Flink support to load properties within the
>ParameterTool [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html#usage
> [2]
> https://github.com/apache/flink/blob/f1721293b0701d584d42bd68671181e332d2ad04/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java#L120
>
> Best
> Yun Tang
>
> --
> *From:* Vishwas Siravara 
> *Sent:* Saturday, June 29, 2019 0:43
> *To:* user
> *Subject:* Providing external files to flink classpath
>
> Hi ,
> I am trying to add external property files to the flink classpath for
> my application. These files are not a part of the fat jar. I put them
> under the lib folder but flink cant find them? How can I manage
> external property files that needs to be read by flink ?
>
> Thanks,
> Vishwas
>


Flink and CDC

2019-07-17 Thread Flavio Pompermaier
Hi to all,
I'd like to know whether it exists or not an example about how to leverage
Debezium as a CDC source and to feed a Flink Table (From MySQL for example).

Best,
Flavio


Re: Questions about user doc.

2019-07-17 Thread Biao Liu
Hi Vishwas,

> I am guessing this means that Flink executes successive tasks from
different pipelines successively right ?

As the document described, "Note that Flink often executes successive tasks
concurrently: For Streaming programs, that happens in any case, but also
for batch programs, it happens frequently.". So I think "successively" is
not accurate, at least for streaming job.

> I also don't fully understand Intermediate result partition and
Intermediate dataset , why are there two boxes in the diagram for
intermediate result after the first execution job vertex ? Is there any
more docs I can read to clearly understand these diagrams, thanks for your
help.

1. The "Intermediate dataset" is a kind of logical concept described in
JobGraph, while the "Intermediate result partition" is more like physical
concept described in ExecutionGraph. The "Intermediate result partition" is
a parallel version of "Intermediate dataset".
2. This document is under "Internals" part. It refers to some internal
implementations. There might not be enough documents as you wish. There are
some links of the critical concepts of this document. They link to Flink
Github repository. Sometimes codes are the best document :)


Vishwas Siravara  于2019年7月17日周三 下午1:40写道:

> Hey guys,
> In this document :
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
>  ,
> there is a line in the beginning of the scheduling section which says that
> : "A pipeline consists of multiple successive tasks, such as the *n-th* 
> parallel
> instance of a MapFunction together with the *n-th* parallel instance of a
> ReduceFunction. Note that Flink often executes successive tasks
> concurrently:"
>
> I am guessing this means that Flink executes successive tasks from
> different pipelines successively right ?
>
> I also don't fully understand Intermediate result partition and
> Intermediate dataset , why are there two boxes in the diagram for
> intermediate result after the first execution job vertex ? Is there any
> more docs I can read to clearly understand these diagrams, thanks for your
> help.
>
> Thanks,
> Vishwas
>


yarn-session vs cluster per job for streaming jobs

2019-07-17 Thread Maxim Parkachov
Hi,

I'm looking for advice on how to run flink streaming jobs on Yarn cluster
in production environment. I tried in testing environment both approaches
with HA mode, namely yarn session + multiple jobs vs cluster per job, both
seems to work for my cases, with slight preference of yarn session mode to
centrally manage credentials. I'm looking to run about 10 streaming jobs
mostly reading/writing from kafka + cassandra with following restictions:
1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I
have a concern here what happens when Job manager dies in session mode.
2. there are often network interruptions/slowdowns.
3. I'm trying to minimise time to restart job to have as much as possible
continious processing.

Thanks in advance,
Maxim.


Re: Automatic deployment of new version of streaming stateful job

2019-07-17 Thread Maxim Parkachov
Hi Marc,

thanks a lot for the tool. Unfortunately, I could not direcly use it, but I
will take couple of ideas and will implement my own script.

Nevertherless, I'm really surprised that such functionality doesn't exist
out of the box.

Regards,
Maxim.

On Tue, Jul 16, 2019 at 9:22 AM Marc Rooding  wrote:

> Hi Maxim
>
> You could write a script yourself which triggers the cancel with savepoint
> and then starts a new version using the savepoint that was created during
> the cancel.
>
> However, I’ve built a tool that allows you to perform these steps more
> easily: https://github.com/ing-bank/flink-deployer. The deployer will
> allow you to deploy or upgrade your jobs. All you need to do is integrate
> it into your CI/CD.
>
> Kind regards
>
> Marc
> On 16 Jul 2019, 02:46 +0200, Maxim Parkachov ,
> wrote:
>
> Hi,
>
> I'm trying to bring my first stateful streaming Flink job to production
> and have trouble understanding how to integrate it with CI/CD pipeline. I
> can cancel the job with savepoint, but in order to start new version of
> application I need to specify savepoint path manually ?
>
> So, basically my question, what is best practice of automatically
> restarting or deploying new version of stateful streaming application ?
> Every tip is greatly appreciated.
>
> Thanks,
> Maxim.
>
>


Re: [DISCUSS] Create a Flink ecosystem website

2019-07-17 Thread Robert Metzger
Hey all,

Daryl and I have great news to share. We are about to finish adding the
basic features to the ecosystem page.
We are at a stage where it is ready to be reviewed and made public.

You can either check out a development instance of the ecosystem page here:
https://flink-ecosystem-demo.flink-resources.org/
Or you run it locally, with the instructions from the README.md:
https://github.com/sorahn/flink-ecosystem

Please report all issues you find here:
https://github.com/sorahn/flink-ecosystem/issues or in this thread.

The next steps in this project are the following:
- We fix all issues reported through this testing
- We set up the site on the INFRA resources Becket has secured [1], do some
further testing (including email notifications) and pre-fill the page with
some packages.
- We set up a packages.flink.apache.org or flink.apache.org/packages domain
- We announce the packages through a short blog post

Happy testing!

Best,
Robert

[1] https://issues.apache.org/jira/browse/INFRA-18010


On Thu, Apr 25, 2019 at 6:23 AM Becket Qin  wrote:

> Thanks for the update, Robert. Looking forward to the website. If there is
> already a list of software we need to run the website, we can ask Apache
> infra team to prepare the VM for us, as that may also take some time.
>
> On Wed, Apr 24, 2019 at 11:57 PM Robert Metzger 
> wrote:
>
>> Hey all,
>>
>> quick update on this project: The frontend and backend code have been put
>> together into this repository: https://github.com/sorahn/flink-ecosystem
>> We also just agreed on an API specification, and will now work on
>> finishing the backend.
>>
>> It will probably take a few more weeks for this to finish, but we are
>> making progress :)
>>
>> Best,
>> Robert
>>
>>
>> On Mon, Apr 15, 2019 at 11:18 AM Robert Metzger 
>> wrote:
>>
>>> Hey Daryl,
>>>
>>> thanks a lot for posting a link to this first prototype on the mailing
>>> list! I really like it!
>>>
>>> Becket: Our plan forward is that Congxian is implementing the backend
>>> for the website. He has already started with the work, but needs at least
>>> one more week.
>>>
>>>
>>> [Re-sending this email because the first one was blocked on dev@f.a.o]
>>>
>>>
>>> On Mon, Apr 15, 2019 at 7:59 AM Becket Qin  wrote:
>>>
 Hi Daryl,

 Thanks a lot for the update. The site looks awesome! This is a great
 progress. I really like the conciseness of GUI.

 One minor suggestion is that for the same library, there might be
 multiple versions compatible with different Flink versions. It would be
 good to show that somewhere in the project page as it seems important to
 the users.

 BTW, will you share the plan to move forward? Would additional hands
 help?

 Thanks,

 Jiangjie (Becket) Qin

 On Sat, Apr 13, 2019 at 7:10 PM Daryl Roberts 
 wrote:

> > Shall we add a guide page to show people how to publish their
> projects to the website? The exact rules can be discussed and drafted in a
> separate email thread IMO
>
> This is a good idea. (Both the guise, and separate thread), I think
> once there is an actual packed in place we’ll be in a lot better position
> to discuss this.
>
> > The "Log in with Github" link doesn't seem to work yet. Will it only
> allow login for admins and publishers, or for everyone?
>
> Correct, all the oauth stuff requires a real server. We are currently
> just faking everything.
>
> I will add a mock-login page (username/password that just accepts
> anything and displays whatever username you type in) so we can see the
> add-comment field and add-packages page once they exist.
>
>
>
>


Re: Does Flink support raw generic types in a merged stream?

2019-07-17 Thread John Tipper
Hi Chesnay,

Yes, but the actual use case needs to support more than 2 streams, so if I go 
down the Either route then I have arbitrarily sized nested Eithers, i.e. 
Either, C> etc, which gets pretty messy very quickly.

Many thanks,

John

Sent from my iPhone

On 17 Jul 2019, at 13:29, Chesnay Schepler 
mailto:ches...@apache.org>> wrote:


Have you looked at org.apache.flink.types.Either? If you'd wrap all elements in 
both streams before the union you should be able to join them properly.

On 17/07/2019 14:18, John Tipper wrote:
Hi All,


Can I union/join 2 streams containing generic classes, where each stream has a 
different parameterised type? I'd like to process the combined stream of values 
as a single raw type, casting to a specific type for detailed processing, based 
on some information in the type that will allow me to safely cast to the 
specific type.

I can't share my exact code, but the below example shows the sort of thing I 
want to do.

So, as an example, given the following generic type:

class MyGenericContainer extends Tuple3 {
...

private final String myString;

private final IN value;

private final Class clazz; // created by constructor

private SomeOtherClass someOtherClass;

...

}


and 2 streams, I'd like to be able to do something like:

DataStream> stream1 = ...
DataStream> stream2 = ...



DataStream<...> merged = stream1.union(stream2).process(new 
MyProcessFunction());



// within an operator, such as a MyProcessFunction:

MyGenericContainer container = raw generic container passed to function;

Object rawValue = container.getValue();

performProcessing((container.getClazz())rawValue); // safely cast rawValue


However, I get an error when I do this:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s). Otherwise 
the type has to be specified explicitly using type information.
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)

at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)

at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)

at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)

at 
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)


If I try to add a returns() to the code, like this:

DataStream<...> merged = stream1.union(stream2)
.process(...)

.returns(new TypeHint() {})


then I get a different exception:

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The 
TypeHint is using a generic variable.This is not supported, generic types must 
be fully specified for the TypeHint.


Is this sort of thing supported or is there another way of joining multiple 
streams into a single stream, where each stream object will have a specific 
type of a common generic type?


Many thanks,

John




Re: table toRetractStream missing last record and adding extra column (True)

2019-07-17 Thread sri hari kali charan Tummala
Question 1:-

I did tired map function end up having issue (
https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i
)

I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am
getting this error asking me for InferedR , what is InferedR in FLink?

  val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
  override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
t.f1
  }
  /*override def map(t: tuple.Tuple2[Boolean, Row], collector:
Collector[Object]): Unit = {
collector.collect(t.f1)
  }
*/
}

tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).map(mymapFunction)
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
  FileSystem.WriteMode.OVERWRITE,"\n","|")

and when I try to I get a different type of error.




*Error:(143, 74) type mismatch; found   :
org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
required:
org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
  tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).map(mymapFunction)*

*Question 2:- *
*I dont have any source data issue, to regenerate this issue for testing
its simple.*

*create a kinesis stream *
*run the producer *
https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala

then run the consumer:-
https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala

Thanks
Sri







On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng  wrote:

> Hi Sri,
>
> Question1:
> You can use a map to filter the "true", i.e, ds.map(_._2).
> Note, it's ok to remove the "true" flag for distinct as it does not
> generate updates. For other query contains updates, such as a non-window
> group by, we should not filter the flag or the result is not correct.
>
> Question 2:
> I can't reproduce this problem in my local environment. Maybe there is
> something wrong with the source data?
>
> Best, Hequn
>
> On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> windows for question 1 or question 2 or both ?
>>
>> Thanks
>> Sri
>>
>> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala 
>> wrote:
>>
>>> Looks like you need a window
>>>
>>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
 Hi All,

 I am trying to write toRetractSream to CSV which is kind of working ok
 but I get extra values like True and then my output data values.

 Question1 :-
 I dont want true in my output data how to achieve this?

 Scree

 Question 2:-
 in the output file (CSV) I am missing data in the last line is the
 toRetractStram closing before writing to file?

 Screen Shot attached

 Code:-

 val data = kinesis.map(mapFunction)
 tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
 val query = "SELECT distinct 
 cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
  FROM transactions where cc_num not in ('cc_num')"
 val table = tEnv.sqlQuery(query)
 tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
   
 .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
 FileSystem.WriteMode.OVERWRITE,"\n","|")



 --
 Thanks & Regards
 Sri Tummala


>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Re: table toRetractStream missing last record and adding extra column (True)

2019-07-17 Thread Hequn Cheng
Hi Sri,

Question1:
You can use a map to filter the "true", i.e, ds.map(_._2).
Note, it's ok to remove the "true" flag for distinct as it does not
generate updates. For other query contains updates, such as a non-window
group by, we should not filter the flag or the result is not correct.

Question 2:
I can't reproduce this problem in my local environment. Maybe there is
something wrong with the source data?

Best, Hequn

On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> windows for question 1 or question 2 or both ?
>
> Thanks
> Sri
>
> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala 
> wrote:
>
>> Looks like you need a window
>>
>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am trying to write toRetractSream to CSV which is kind of working ok
>>> but I get extra values like True and then my output data values.
>>>
>>> Question1 :-
>>> I dont want true in my output data how to achieve this?
>>>
>>> Scree
>>>
>>> Question 2:-
>>> in the output file (CSV) I am missing data in the last line is the
>>> toRetractStram closing before writing to file?
>>>
>>> Screen Shot attached
>>>
>>> Code:-
>>>
>>> val data = kinesis.map(mapFunction)
>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>> val query = "SELECT distinct 
>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>>  FROM transactions where cc_num not in ('cc_num')"
>>> val table = tEnv.sqlQuery(query)
>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
>>>   
>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>>> FileSystem.WriteMode.OVERWRITE,"\n","|")
>>>
>>>
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


Re: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.

2019-07-17 Thread Hequn Cheng
Hi Sri,

For scala jobs, we should import the corresponding scala Environment and
DataStream.

e.g,
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
See example here[1].

Best,
Hequn

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala


On Tue, Jul 16, 2019 at 11:03 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> is this a Bug in Flink Scala?
>
> Full code and Maven POM:-
>
> package com.aws.examples.kinesis.consumer.TransactionExample
>
> import java.lang
> import java.util.Properties
>
> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, 
> SimpleStringSchema}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
> import 
> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, 
> ConsumerConfigConstants}
> import com.google.gson.{Gson, JsonObject}
> import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
> import java.sql.{DriverManager, Time}
>
> import com.aws.SchemaJavaClasses.Row1
> import org.apache.flink.types.Row
> import org.apache.flink.table.api.scala._
> import org.apache.flink.table.sinks.CsvTableSink
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.table.sinks.TableSink
> import org.apache.flink.core.fs.{FileSystem, Path}
>
> import scala.collection.JavaConversions._
> import org.apache.flink.table.sources.CsvTableSource
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.streaming.api.datastream.DataStream
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
> import com.aws.customSinks.CsvCustomSink
> import org.apache.flink.api.java.tuple
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.sinks.AppendStreamTableSink
> import org.apache.flink.table.sinks.RetractStreamTableSink
> import org.apache.flink.api.java.DataSet
>
>
>
> object KinesisConsumer extends RetractStreamTableSink[Row] {
>
>   override def configure(strings: Array[String], typeInformations: 
> Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???
>
>   override def getFieldNames: Array[String] = ???
>
>   override def getFieldTypes: Array[TypeInformation[_]] = ???
>
>   override def emitDataStream(dataStream: 
> DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???
>
>   override def getOutputType(): TupleTypeInfo[tuple.Tuple2[lang.Boolean, 
> Row]] = super.getOutputType()
>
>   override def getRecordType: TypeInformation[Row] = ???
>
>
>   def main(args: Array[String]): Unit = {
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv: org.apache.flink.table.api.java.StreamTableEnvironment = 
> TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new 
> FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), 
> consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, String, 
> String, String, String, String, String, String, String]] =
>   new MapFunction[String, Tuple10[String, String, String, String, String, 
> String, String, String, String, String]]() {
>
> override def map(s: String): Tuple10[String, String, String, String, 
> String, String, String, String, String, String] = {
>
>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>   val csvData = data.getCc_num + "," +
> data.getFirst + "," +
> data.getLast + "," +
> data.getTrans_num + "," +
> data.getTrans_time + "," +
> data.getCategory + "," +
> data.getMerchant + "," +
> data.getAmt + "," +
> data.getMerch_lat + "," +
> data.get

Re: Does Flink support raw generic types in a merged stream?

2019-07-17 Thread Chesnay Schepler
Have you looked at org.apache.flink.types.Either? If you'd wrap all 
elements in both streams before the union you should be able to join 
them properly.



On 17/07/2019 14:18, John Tipper wrote:

Hi All,

Can I union/join 2 streams containing generic classes, where each 
stream has a different parameterised type? I'd like to process the 
combined stream of values as a single raw type, casting to a specific 
type for detailed processing, based on some information in the type 
that will allow me to safely cast to the specific type.


I can't share my exact code, but the below example shows the sort of 
thing I want to do.


So, as an example, given the following generic type:

class MyGenericContainer extends Tuple3 {
    ...
    private final String myString;
    private final IN value;
    private final Class clazz; // created by constructor
    private SomeOtherClass someOtherClass;
    ...
} ||

and 2 streams, I'd like to be able to do something like:

DataStream> stream1 = ...
DataStream> stream2 = ...
DataStream<...> merged = stream1.union(stream2).process(new 
MyProcessFunction());

// within an operator, such as a MyProcessFunction:
MyGenericContainer container = raw generic container passed to function;
Object rawValue = container.getValue();
performProcessing((container.getClazz())rawValue); // safely cast 
rawValue ||


However, I get an error when I do this:

Caused by: 
org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
determined. This is most likely a type erasure problem. The type 
extraction currently supports types with generic variables only in 
cases where all variables in the return type can be deduced from the 
input type(s). Otherwise the type has to be specified explicitly using 
type information.
    at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133) 

    at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853) 

    at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) 

    at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587) 

    at 
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633) 
||


If I try to add a|returns()|to the code, like this:

DataStream<...> merged = stream1.union(stream2)
    .process(...)
    .returns(new TypeHint() {}) ||

then I get a different exception:

Exception in thread "main" 
org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a 
generic variable.This is not supported, generic types must be fully 
specified for the TypeHint.||


Is this sort of thing supported or is there another way of joining 
multiple streams into a single stream, where each stream object will 
have a specific type of a common generic type?



Many thanks,

John






Does Flink support raw generic types in a merged stream?

2019-07-17 Thread John Tipper
Hi All,


Can I union/join 2 streams containing generic classes, where each stream has a 
different parameterised type? I'd like to process the combined stream of values 
as a single raw type, casting to a specific type for detailed processing, based 
on some information in the type that will allow me to safely cast to the 
specific type.

I can't share my exact code, but the below example shows the sort of thing I 
want to do.

So, as an example, given the following generic type:

class MyGenericContainer extends Tuple3 {
...
private final String myString;
private final IN value;
private final Class clazz; // created by constructor
private SomeOtherClass someOtherClass;
...
}

and 2 streams, I'd like to be able to do something like:

DataStream> stream1 = ...
DataStream> stream2 = ...

DataStream<...> merged = stream1.union(stream2).process(new 
MyProcessFunction());

// within an operator, such as a MyProcessFunction:
MyGenericContainer container = raw generic container passed to function;
Object rawValue = container.getValue();
performProcessing((container.getClazz())rawValue); // safely cast rawValue

However, I get an error when I do this:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s). Otherwise 
the type has to be specified explicitly using type information.
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)
at 
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)

If I try to add a returns() to the code, like this:

DataStream<...> merged = stream1.union(stream2)
.process(...)
.returns(new TypeHint() {})

then I get a different exception:

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The 
TypeHint is using a generic variable.This is not supported, generic types must 
be fully specified for the TypeHint.

Is this sort of thing supported or is there another way of joining multiple 
streams into a single stream, where each stream object will have a specific 
type of a common generic type?


Many thanks,

John



unsubscribe

2019-07-17 Thread Kailash Kota
unsubscribe

Thanks & Regards,
Kailash Kota
Product Development | JDA Software Pvt Ltd.
Ph: +91 80 6101 8649


Think BIG > Start small > Run fast! Download JDA's AI/ML buyers 
guide and get started on your autonomous SCM journey!

To the extent permitted by law, we may monitor electronic communications for 
the purposes of ensuring compliance with our legal and regulatory obligations 
and internal policies. We may also collect email traffic headers for analyzing 
patterns of network traffic and managing client relationships. For additional 
information see https://jda.com/privacy-policy.


[Table API] ClassCastException when converting a table to DataStream

2019-07-17 Thread Dongwon Kim
Hello,

Consider the following snippet:

> Table sourceTable = getKafkaSource0(tEnv);
> DataStream stream = tEnv.toAppendStream(sourceTable, Row.class)
>
> *  .map(a -> a)  .returns(sourceTable.getSchema().toRowType());*
> stream.print();
>
where sourceTable.printSchema() shows:

> root
>  |-- time1: TimeIndicatorTypeInfo(rowtime)



 This program returns the following exception:

> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
> at app.metatron.test.Main2.main(Main2.java:231)
> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
> cast to java.lang.Long*
> * at
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> ...


The row serializer seems to try to deep-copy an instance of
java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as
I want to convert DataStream into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
> ConnectorDescriptor connectorDescriptor = new Kafka()
>   .version("universal")
>   .topic("mytopic")
>   .property("bootstrap.servers", "localhost:9092")
>   .property("group.id", "mygroup")
>   .startFromEarliest();
> FormatDescriptor formatDescriptor = new Csv()
>   .deriveSchema()
>   .ignoreParseErrors()
>   .fieldDelimiter(',');
> Schema schemaDescriptor = new Schema()
>   .field("time1", SQL_TIMESTAMP())
>   .rowtime(
> new Rowtime()
>   .timestampsFromField("rowTime")
>   .watermarksPeriodicBounded(100)
>   );
> tEnv.connect(connectorDescriptor)
>   .withFormat(formatDescriptor)
>   .withSchema(schemaDescriptor)
>   .inAppendMode()
>   .registerTableSource("mysrc");
> return tEnv.scan("mysrc");
>   }