Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-29 Thread Ufuk Celebi
Hey Aaron,

I'm glad to hear that you resolved the issue.

I think a docs contribution for this would be very helpful and could
update this page:
https://github.com/apache/flink/blob/master/docs/monitoring/debugging_classloading.md.

If you want to create a separate JIRA ticket for this, ping me with
your JIRA username and I'll add you to the list of contributors (which
gives you permissions to create tickets).

I'll think a bit more about the other points you mentioned and get
back to you if I have another idea.

Best,

Ufuk

On Tue, Jan 29, 2019 at 10:48 PM Aaron Levin  wrote:
>
> Hi Ufuk,
>
> I'll answer your question, but first I'll give you an update on how we 
> resolved the issue:
>
> * adding `org.apache.hadoop.io.compress.SnappyCodec` to 
> `classloader.parent-first-patterns.additional` in `flink-conf.yaml` (though, 
> putting `org.apache.hadoop.util.NativeCodeLoader` also worked)
> * putting a jar with `hadoop-common` + it's transitive dependencies, then 
> using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and its 
> transitive dependencies). So we end up with jar that has `SnappyCodec` and 
> whatever it needs to call transitively. We put this jar on the task manager 
> classpath.
>
> I believe `SnappyCodec` was being called via our code. This worked the first 
> time but deploying a second time caused `libhadoop.so` to be loaded in a 
> second class loader. By putting a jar with `SnappyCodec` and it's transitive 
> dependencies on the task manager classpath and specifying that `SnappyCodec` 
> needs to be loaded from the parent classloader, we ensure that only one 
> classloader loads `libhadoop.so`. I don't think this is the best way to 
> achieve what we want, but it works for now.
>
> Next steps: if no one is on it, I can take a stab at updating the 
> documentation to clarify how to debug and resolve Native library loading. 
> This was a nice learning experience and I think it'll be helpful to have this 
> in the docs for those who aren't well-versed in how classloading on the JVM 
> works!
>
> To answer your questions:
>
> 1. We install hadoop on our machines and tell flink task managers to access 
> it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in 
> `flink-conf.yaml`
> 2. We put flink's shaded hadoop-fs-s3 on both the task manager and job 
> manager classpath (I believe this is only used by the Job Managers when they 
> interact with S3 for checkpoints etc. I don't believe any user code is using 
> this).
> 3. Our flink applications consist of a "fat jar" that has some 
> `org.apache.hadoop` dependencies bundled with it. I believe this is the 
> source of why we're loading `SnappyCodec` twice and triggering this issue.
> 4. For example code: we have a small wrapper around 
> `org.apache.flink.api.common.io.FileInputFormat` which does the work with 
> sequence files. It looks like (after removing some stuff to make it more 
> clear):
>
> ```
> abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <: Writable](
> typeInformation: TypeInformation[T]
> ) extends FileInputFormat[T]
> with ResultTypeQueryable[T] {
>   @transient private var bufferedNextRecord: T = _
>   @transient private var hadoopStream: HadoopFSDataInputStream = _
>   @transient private var sequenceFileReader: SequenceFile.Reader = _
>
>   unsplittable = true
>   enumerateNestedFiles = true
>
>   // *
>   // This is where we'd see exceptions.
>   // *
>   override def open(fileSplit: FileInputSplit): Unit = {
> super.open(fileSplit)
> val config = new Configuration()
> hadoopStream = WrappedHadoopInputStream.wrap(stream)
> sequenceFileReader = new SequenceFile.Reader(config, 
> SequenceFile.Reader.stream(hadoopStream))
> bufferNextRecord()
>   }
> ...
> }
>
> // AND
>
> class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream)
> extends InputStream
> with Seekable
> with PositionedReadable {
>
>   def read(): Int = underlying.read()
>   def seek(pos: Long): Unit = underlying.seek(pos)
>   def getPos: Long = underlying.getPos
> }
> ...
> ```
>
> Thanks for all your help, I appreciate it! I wouldn't have been able to debug 
> and resolve this if it wasn't for you filing the ticket. Thank you so much!
>
> [0] https://github.com/pantsbuild/jarjar
>
> Aaron Levin
>
> On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi  wrote:
>>
>> Hey Aaron,
>>
>> sorry for the late reply (again).
>>
>> (1) I think that your final result is in line with what I have
>> reproduced in https://issues.apache.org/jira/browse/FLINK-11402.
>>
>> (2) I think renaming the file would not help as it will still be
>> loaded multiple times when the jobs restarts (as it happens in
>> FLINK-11402).
>>
>> (3) I'll try to check whether Flink's shading of Hadoop is related to
>> this. I don't think so though. @Chesnay (cc'd): What do you think?
>>
>> (4) @Aaron: Can you tell me which Hadoo

Re: No resource available error while testing HA

2019-01-29 Thread Averell
Hi Gary,

Thanks for the help.

Gary Yao-3 wrote
> You are writing that it takes YARN 10 minutes to restart the application
> master (AM). However, in my experiments the AM container is restarted
> within a
> few seconds when after killing the process. If in your setup YARN actually
> needs 10 minutes to restart the AM, then you could try increasing the
> number
> of retry attempts by the client [2].

I think that comes from the difference in how we tested. When I tried to
kill the JM process (using kill -9 pid) then a new process got created
within some seconds. However, when I tried to test by crashing the server
(using init 0), then it needed some time. I found the yarn-site parameter
for that timer: yarn.am.liveness-monitor.expiry-interval-ms, which is
default to 10 minutes [1]
I increased the rest client configuration as you suggested, and it did help.


Gary Yao-3 wrote
> The REST API that is queried by the Web UI returns the root cause from the
> ExecutionGraph [3]. All job status transitions should be logged together
> with
> the exception that caused the transition [4]. Check for INFO level log
> messages that start with "Job [...] switched from state" followed by a
> stacktrace. If you cannot find the exception, the problem might be rooted
> in
> your log4j or logback configuration.

Thanks. I got the point.
I am using logback. Tried to configure rolling logs, but not yet success
yet. Will need to try more.

Thanks and regards,
Averell

[1]
https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml#yarn.am.liveness-monitor.expiry-interval-ms

  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: About KafkaConsumer and WM'ing and EventTime charactersitics

2019-01-29 Thread Congxian Qiu
Hi Vishal
 May this doc[1] be helpful for you.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
Best,
Congxian


Vishal Santoshi  于2019年1月30日周三 上午4:36写道:

> It seems from
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
> that iTimeCharacteristic.IngestionTime should do the trick.
>
> Just wanted to confirm that the ingestion time is the event time provided
> by the kafka producer.
>
> On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi 
> wrote:
>
>>  In case where one needs t to use kafka event time ( ingestion time )
>> for watermark generation and timestamp extraction is setting
>> EventTimeCharactersitic  as EventTime enough ?
>>
>> Or is this  explicit code required ?
>>
>> consumer.assignTimestampsAndWatermarks(new 
>> AssignerWithPunctuatedWatermarks() {
>> @Nullable
>> @Override
>> public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, long 
>> extractedTimestamp) {
>> return new Watermark(extractedTimestamp);
>> }
>>
>> @Override
>> public long extractTimestamp(KafkaRecord element, long 
>> previousElementTimestamp) {
>> return previousElementTimestamp;
>> }
>> });
>>
>>
>>
>>
>>
>>


Case When in Flink Table API

2019-01-29 Thread Soheil Pourbafrani
How can I use the correct way of *Case When *this example:

myTlb.select(
"o_orderdate.substring(0,4) as o_year,
 volume,
(when(n_name.like('%BRAZIL%'),volume).else(0)) as case_volume"
)

Flink errors on the line
(when(n_name.like('%BRAZIL%'),volume).else(0)) as case_volume"


Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-29 Thread Aaron Levin
Hi Ufuk,

I'll answer your question, but first I'll give you an update on how we
resolved the issue:

* adding `org.apache.hadoop.io.compress.SnappyCodec` to
`classloader.parent-first-patterns.additional` in `flink-conf.yaml`
(though, putting `org.apache.hadoop.util.NativeCodeLoader` also worked)
* putting a jar with `hadoop-common` + it's transitive dependencies, then
using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and
its transitive dependencies). So we end up with jar that has `SnappyCodec`
and whatever it needs to call transitively. We put this jar on the task
manager classpath.

I believe `SnappyCodec` was being called via our code. This worked the
first time but deploying a second time caused `libhadoop.so` to be loaded
in a second class loader. By putting a jar with `SnappyCodec` and it's
transitive dependencies on the task manager classpath and specifying that
`SnappyCodec` needs to be loaded from the parent classloader, we ensure
that only one classloader loads `libhadoop.so`. I don't think this is the
best way to achieve what we want, but it works for now.

Next steps: if no one is on it, I can take a stab at updating the
documentation to clarify how to debug and resolve Native library loading.
This was a nice learning experience and I think it'll be helpful to have
this in the docs for those who aren't well-versed in how classloading on
the JVM works!

To answer your questions:

1. We install hadoop on our machines and tell flink task managers to access
it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in
`flink-conf.yaml`
2. We put flink's shaded hadoop-fs-s3 on both the task manager and job
manager classpath (I believe this is only used by the Job Managers when
they interact with S3 for checkpoints etc. I don't believe any user code is
using this).
3. Our flink applications consist of a "fat jar" that has some
`org.apache.hadoop` dependencies bundled with it. I believe this is the
source of why we're loading `SnappyCodec` twice and triggering this issue.
4. For example code: we have a small wrapper around
`org.apache.flink.api.common.io.FileInputFormat` which does the work with
sequence files. It looks like (after removing some stuff to make it more
clear):

```
abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <:
Writable](
typeInformation: TypeInformation[T]
) extends FileInputFormat[T]
with ResultTypeQueryable[T] {
  @transient private var bufferedNextRecord: T = _
  @transient private var hadoopStream: HadoopFSDataInputStream = _
  @transient private var sequenceFileReader: SequenceFile.Reader = _

  unsplittable = true
  enumerateNestedFiles = true

  // *
  // This is where we'd see exceptions.
  // *
  override def open(fileSplit: FileInputSplit): Unit = {
super.open(fileSplit)
val config = new Configuration()
hadoopStream = WrappedHadoopInputStream.wrap(stream)
sequenceFileReader = new SequenceFile.Reader(config,
SequenceFile.Reader.stream(hadoopStream))
bufferNextRecord()
  }
...
}

// AND

class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream)
extends InputStream
with Seekable
with PositionedReadable {

  def read(): Int = underlying.read()
  def seek(pos: Long): Unit = underlying.seek(pos)
  def getPos: Long = underlying.getPos
}
...
```

Thanks for all your help, I appreciate it! I wouldn't have been able to
debug and resolve this if it wasn't for you filing the ticket. Thank you so
much!

[0] https://github.com/pantsbuild/jarjar

Aaron Levin

On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi  wrote:

> Hey Aaron,
>
> sorry for the late reply (again).
>
> (1) I think that your final result is in line with what I have
> reproduced in https://issues.apache.org/jira/browse/FLINK-11402.
>
> (2) I think renaming the file would not help as it will still be
> loaded multiple times when the jobs restarts (as it happens in
> FLINK-11402).
>
> (3) I'll try to check whether Flink's shading of Hadoop is related to
> this. I don't think so though. @Chesnay (cc'd): What do you think?
>
> (4) @Aaron: Can you tell me which Hadoop libraries you use and share
> some code so I can try to reproduce this exactly on my side? Judging
> from the earlier stack traces you have shared, I'm assuming you are
> trying to read Snappy-compressed sequence files.
>
> – Ufuk
>
> On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin  wrote:
> >
> > I don't control the code calling `System.loadLibrary("hadoop")` so
> that's not an option for me, unfortunately.
> >
> > On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma  wrote:
> >>
> >> This may be caused by a  jvm process can only load a so once.So a triky
> way is to rename it。
> >>
> >> 发自我的 iPhone
> >>
> >> 在 2019年1月25日,上午7:12,Aaron Levin  写道:
> >>
> >> Hi Ufuk,
> >>
> >> Update: I've pinned down the issue. It's multiple classloaders loading
> `libhadoop.so`:
> >>
> >> ```
> >> failed to load native had

How to save table with header

2019-01-29 Thread Soheil Pourbafrani
Hi,
I can save tables in a CSV file like this:

TableSink q6Sink = new CsvTableSink(SinkPath, ",");
temp.writeToSink(q6Sink);

but I want to save the table with the table header as the first line. Is it
possible in Flink?


Re: How to load multiple same-format files with single batch job?

2019-01-29 Thread Fabian Hueske
Hi,

You can point a file-based input format to a directory and the input format
should read all files in that directory.
That works as well for TableSources that are internally use file-based
input formats.
Is that what you are looking for?

Best, Fabian

Am Mo., 28. Jan. 2019 um 17:22 Uhr schrieb françois lacombe <
francois.laco...@dcbrain.com>:

> Hi all,
>
> I'm wondering if it's possible and what's the best way to achieve the
> loading of multiple files with a Json source to a JDBC sink ?
> I'm running Flink 1.7.0
>
> Let's say I have about 1500 files with the same structure (same format,
> schema, everything) and I want to load them with a *batch* job
> Can Flink handle the loading of one and each file in a single source and
> send data to my JDBC sink?
> I wish I can provide the URL of the directory containing my thousand files
> to the batch source to make it load all of them sequentially.
> My sources and sinks are currently available for BatchTableSource, I guess
> the cost to make them available for streaming would be quite expensive for
> me for the moment.
>
> Have someone ever done this?
> Am I wrong to expect doing so with a batch job?
>
> All the best
>
> François Lacombe
>
>
>    
> 
> 
>
> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
> nécessaire
>


Re: About KafkaConsumer and WM'ing and EventTime charactersitics

2019-01-29 Thread Vishal Santoshi
It seems from
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
that iTimeCharacteristic.IngestionTime should do the trick.

Just wanted to confirm that the ingestion time is the event time provided
by the kafka producer.

On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi 
wrote:

>  In case where one needs t to use kafka event time ( ingestion time )  for
> watermark generation and timestamp extraction is setting
> EventTimeCharactersitic  as EventTime enough ?
>
> Or is this  explicit code required ?
>
> consumer.assignTimestampsAndWatermarks(new 
> AssignerWithPunctuatedWatermarks() {
> @Nullable
> @Override
> public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, long 
> extractedTimestamp) {
> return new Watermark(extractedTimestamp);
> }
>
> @Override
> public long extractTimestamp(KafkaRecord element, long 
> previousElementTimestamp) {
> return previousElementTimestamp;
> }
> });
>
>
>
>
>
>


About KafkaConsumer and WM'ing and EventTime charactersitics

2019-01-29 Thread Vishal Santoshi
 In case where one needs t to use kafka event time ( ingestion time )  for
watermark generation and timestamp extraction is setting
EventTimeCharactersitic  as EventTime enough ?

Or is this  explicit code required ?

consumer.assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(KafkaRecord lastElement,
long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}

@Override
public long extractTimestamp(KafkaRecord element, long
previousElementTimestamp) {
return previousElementTimestamp;
}
});


Re: Issue setting up Flink in Kubernetes

2019-01-29 Thread Gary Yao
Hi Tim,

There is an end-to-end test in the Flink repository that starts a job
cluster
in Kubernetes (minikube) [1]. If that does not help you, can you answer the
questions below?

What docker images are you using? Can you share the kubernetes resource
definitions? Can you share the complete logs of the JM and TMs? Did you
follow
the steps outlined in the Flink documentation [2]?

Best,
Gary

[1]
https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh#L46
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/kubernetes.html


On Tue, Jan 29, 2019 at 5:32 AM Timothy Victor  wrote:

> Hi -
>
> Has there been any update on the below issue?   I am also facing the same
> problem.
>
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201812.mbox/%3ccac2r2948lqsyu8nab5p7ydnhhmuox5i4jmyis9g7og6ic-1...@mail.gmail.com%3E
>
> There is a similar issue (
> https://stackoverflow.com/questions/50806228/cant-submit-job-with-flink-1-5-cluster)
> where task managers cannot reach the job manager, and the solution appeared
> to be to add JOB_MANAGER_RPC_ADDRESS to /etc/hosts.   However, the issue
> above is slightly different in that the TMs appear to try to use the
> Kubernetes pod name to connect.
>
> Thanks
>
> Tim
>


Re: Select feilds in Table API

2019-01-29 Thread Fabian Hueske
The problem is that the table "lineitem" does not have a field
"l_returnflag".
The field in "lineitem" are named [TMP_2, TMP_5, TMP_1, TMP_0, TMP_4,
TMP_6, TMP_3].

I guess it depends on how you obtained lineitem.

Best, Fabian

Am Di., 29. Jan. 2019 um 16:38 Uhr schrieb Soheil Pourbafrani <
soheil.i...@gmail.com>:

> Hi,
>
> I'm trying select some fields:
>
> lineitem
> .select(
> "l_returnflag," +
> "l_linestatus," +
> "l_quantity.sum as sum_qty," +
> "(l_extendedprice * (l_discount - 1)).sum as sum_disc_price," 
> +
> "l_extendedprice.sum as sum_base_price," +
> "(l_extendedprice * (l_discount - 1) * (l_tax + 1)).sum as 
> sum_charge," +
> "l_quantity.avg as avg_qty," +
> "l_extendedprice.avg as avg_price," +
> "l_discount.avg as avg_disc"
> );
>
> But it errors:
>
> Cannot resolve field [l_returnflag] given input [TMP_2, TMP_5, TMP_1,
> TMP_0, TMP_4, TMP_6, TMP_3].
>
> What is the problem?
>


Re: Connector for IBM MQ

2019-01-29 Thread Puneet Kinra
sorry for typo yep we developed few days back.

On Tue, Jan 29, 2019 at 10:27 PM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Yep I did we days back.
>
> On Tue, Jan 29, 2019 at 10:13 PM  wrote:
>
>> Hi all,
>>
>>
>>
>> I was wondering if anybody has anybody made a connector (Source) to be
>> used with IBM MQ?
>>
>> Also if somebody could point me to any doc on how to write a custom
>> connector, it would be appreciated.
>>
>>
>>
>> We are using Scala 2.12 and Flink 1.7.
>>
>>
>>
>> Kind regards,
>>
>>
>>
>> Jacopo Gobbi
>>
>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Connector for IBM MQ

2019-01-29 Thread Puneet Kinra
Yep I did we days back.

On Tue, Jan 29, 2019 at 10:13 PM  wrote:

> Hi all,
>
>
>
> I was wondering if anybody has anybody made a connector (Source) to be
> used with IBM MQ?
>
> Also if somebody could point me to any doc on how to write a custom
> connector, it would be appreciated.
>
>
>
> We are using Scala 2.12 and Flink 1.7.
>
>
>
> Kind regards,
>
>
>
> Jacopo Gobbi
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Connector for IBM MQ

2019-01-29 Thread jacopo.gobbi
Hi all,

I was wondering if anybody has anybody made a connector (Source) to be used 
with IBM MQ?
Also if somebody could point me to any doc on how to write a custom connector, 
it would be appreciated.

We are using Scala 2.12 and Flink 1.7.

Kind regards,

Jacopo Gobbi
Visit our website at http://www.ubs.com

This message contains confidential information and is intended only 
for the individual named.  If you are not the named addressee you 
should not disseminate, distribute or copy this e-mail.  Please 
notify the sender immediately by e-mail if you have received this 
e-mail by mistake and delete this e-mail from your system.

E-mails are not encrypted and cannot be guaranteed to be secure or 
error-free as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses.  The sender 
therefore does not accept liability for any errors or omissions in the 
contents of this message which arise as a result of e-mail transmission.  
If verification is required please request a hard-copy version.  This 
message is provided for informational purposes and should not be 
construed as a solicitation or offer to buy or sell any securities 
or related financial instruments.

UBS reserves the right to retain all messages. Messages are protected
and accessed only in legally justified cases.

For information on how UBS processes and keeps secure your personal
data, please visit our Privacy Notice at
https://www.ubs.com/global/en/legal/privacy.html

Re: Flink Yarn Cluster - Jobs Isolation

2019-01-29 Thread Jamie Grier
Run each job individually as described here:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn

Yes they will run concurrently and be completely isolated from each other.

-Jamie


On Sun, Jan 27, 2019 at 6:08 AM Eran Twili 
wrote:

> Hi,
>
>
>
> In my company we are interesting in running Flink jobs on a Yarn cluster
> (on AWS EMR).
>
> We understand that there are 2 ways ('modes') to execute Flink jobs on a
> yarn cluster.
>
> We *must have the jobs run concurrently!*
>
> From what we understand so far those are the options:
>
>1. Start a long running yarn session, to which we'll send jobs.
>2. Run each job as a 'single job'.
>
> We searched the web to understand the difference and consequences of each
> option,
>
> (We read threw flink-yarn-setup
> 
> and FLIP6
> ,
> along many other references),
>
> but couldn't find clear comprehensive info.
>
>
>
> In the 'session' mode:
>
>1. Does running multiple jobs in single session means there's no job
>isolation?
>2. All jobs will run on the same jvm?
>3. Can we define different classpath for each job in this mode?
>
> In the 'single job' mode:
>
>1. Can we run multiple jobs concurrently?
>2. Is there a complete job isolation by default or do we need to
>configure it (different jvm/classpath)?
>
>
>
> Overall, what will be the different implications in aspects of resource
> management, security, and monitoring?
>
> Another question: what is the difference between multiple sessions of a
> single job vs multiple 'single job' executions?
>
>
>
> We'll be very thankful if someone could provide some answers or reference
> to a comprehensive documentation on those subjects.
>
>
>
> Regards,
>
> Eran
>
>
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Re: Forking a stream with Flink

2019-01-29 Thread Selvaraj chennappan
I think there is misunderstanding . I want to compare raw json and
transformed record .
Hence I need two consumer and merge the stream for comparison.
I have  pipeline defined . pipeline does source(kafka)
,transformation,dedup and persisting to DB .
[image: image.png]

Before reaching to DB task lots of transformation is applied on the
pipeline  Therefore want to validate the record with raw json message which
is available in kafka  with the transformed record.

Hence I want to know How to do that in flink.


On Tue, Jan 29, 2019 at 8:54 PM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi Selvaraj
>
> In your pojo add data member as status or something like that,now set it
> error in case it is invaild .pass the output of flatmap
> to split opertor there you can split the stream
>
> On Tue, Jan 29, 2019 at 6:39 PM Selvaraj chennappan <
> selvarajchennap...@gmail.com> wrote:
>
>> UseCase:- We have kafka consumer to read messages(json ) then it applies
>> to flatmap  for transformation based on the rules ( rules are complex ) and
>> convert it to pojo .
>> We want to verify the record(pojo) is valid by checking field by field of
>> that record .if record is invalid due to transformation rules  then move to
>> error topic otherwise send to DB.
>>
>> I thought of Implementing like adding another consumer to read json
>> message  and compare json message attributes with transformed record
>> attributes .
>>
>> Hence I need to join/coprocess these two streams to validate then decide
>> whether persist to db or sending to error topic.
>>
>> Please let me know if you need more information.
>>
>> On Tue, Jan 29, 2019 at 6:21 PM miki haiat  wrote:
>>
>>> Im not sure if i got your question correctly, can you elaborate more on
>>> your use case
>>>
>>
>>
>> --
>>
>>
>>
>>
>>
>> Regards,
>> Selvaraj C
>>
>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>

-- 





Regards,
Selvaraj C


Re: No resource available error while testing HA

2019-01-29 Thread Gary Yao
Hi Averell,

> Is there any way to avoid this? As if I run this as an AWS EMR job, the
job
> would be considered failed, while it is actually be restored
automatically by
> YARN after 10 minutes).

You are writing that it takes YARN 10 minutes to restart the application
master (AM). However, in my experiments the AM container is restarted
within a
few seconds when after killing the process. If in your setup YARN actually
needs 10 minutes to restart the AM, then you could try increasing the number
of retry attempts by the client [2].

> Regarding logging, could you please help explain about the source of the
> error messages show in "Exception" tab on Flink Job GUI (as per the
> screenshot below).

The REST API that is queried by the Web UI returns the root cause from the
ExecutionGraph [3]. All job status transitions should be logged together
with
the exception that caused the transition [4]. Check for INFO level log
messages that start with "Job [...] switched from state" followed by a
stacktrace. If you cannot find the exception, the problem might be rooted in
your log4j or logback configuration.

Best,
Gary

[1]
https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L767
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#rest-retry-max-attempts
[3]
https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java#L87
[4]
https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1363

On Fri, Jan 25, 2019 at 12:42 PM Averell  wrote:

> Hi Gary,
>
> Yes, my problem mentioned in the original post had been resolved by
> correcting the zookeeper connection string.
>
> I have two other relevant questions, if you have time, please help:
>
> 1. Regarding JM high availability, when I shut down the host having JM
> running, YARN would detect that missing JM and start a new one after 10
> minutes, and the Flink job would be restored. However, on the console
> screen
> that I submitted the job, I got the following error messages: "/The program
> finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException/" (full stack
> trace in the attached file  flink_console_timeout.log
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/flink_console_timeout.log>
>
> )
> Is there any way to avoid this? As if I run this as an AWS EMR job, the job
> would be considered failed, while it is actually be restored automatically
> by YARN after 10 minutes).
>
> 2. Regarding logging, could you please help explain about the source of the
> error messages show in "Exception" tab on Flink Job GUI (as per the
> screenshot below). I could not find any log files has that message (not in
> jobmanager.log or in taskmanager.log in EMR's hadoop-yarn logs folder).
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-01-25_at_22.png>
>
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Select feilds in Table API

2019-01-29 Thread Soheil Pourbafrani
Hi,

I'm trying select some fields:

lineitem
.select(
"l_returnflag," +
"l_linestatus," +
"l_quantity.sum as sum_qty," +
"(l_extendedprice * (l_discount - 1)).sum as sum_disc_price," +
"l_extendedprice.sum as sum_base_price," +
"(l_extendedprice * (l_discount - 1) * (l_tax +
1)).sum as sum_charge," +
"l_quantity.avg as avg_qty," +
"l_extendedprice.avg as avg_price," +
"l_discount.avg as avg_disc"
);

But it errors:

Cannot resolve field [l_returnflag] given input [TMP_2, TMP_5, TMP_1,
TMP_0, TMP_4, TMP_6, TMP_3].

What is the problem?


Filter Date type in Table API

2019-01-29 Thread Soheil Pourbafrani
Hi, I want to filter a field of type Date (Java.sql.Date) like the
following:

filter("f_date <= '1998-10-02'")

and

filter("f_date <= '1998/10/02'")

Expression 'f_date <= 1998/10/02 failed on input check: Comparison is only
supported for numeric types and comparable types of same type, got Date and
String

I tried giving the date without a single quotation but it errors:
Expression 'f_date <= ((1998 / 10) / 2) failed on input check: Comparison
is only supported for numeric types and comparable types of same type, got
Date and Integer


Re: Forking a stream with Flink

2019-01-29 Thread Puneet Kinra
Hi Selvaraj

In your pojo add data member as status or something like that,now set it
error in case it is invaild .pass the output of flatmap
to split opertor there you can split the stream

On Tue, Jan 29, 2019 at 6:39 PM Selvaraj chennappan <
selvarajchennap...@gmail.com> wrote:

> UseCase:- We have kafka consumer to read messages(json ) then it applies
> to flatmap  for transformation based on the rules ( rules are complex ) and
> convert it to pojo .
> We want to verify the record(pojo) is valid by checking field by field of
> that record .if record is invalid due to transformation rules  then move to
> error topic otherwise send to DB.
>
> I thought of Implementing like adding another consumer to read json
> message  and compare json message attributes with transformed record
> attributes .
>
> Hence I need to join/coprocess these two streams to validate then decide
> whether persist to db or sending to error topic.
>
> Please let me know if you need more information.
>
> On Tue, Jan 29, 2019 at 6:21 PM miki haiat  wrote:
>
>> Im not sure if i got your question correctly, can you elaborate more on
>> your use case
>>
>
>
> --
>
>
>
>
>
> Regards,
> Selvaraj C
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-29 Thread Chesnay Schepler
It is not viable for us, as of right now, to release both a lean and fat 
version of flink-dist.
We don't have the required tooling to assemble a correct NOTICE file for 
that scenario.


Besides that his would also go against recent efforts to reduce the 
total size of a Flink release,
as we'd be increasing the total size again by roughly 60% (and naturally 
also increase the compile

time of releases), which I'd like to avoid.

I like Stephans compromise of excluding reporters and file-systems; this 
removes more than 100mb

from the distribution yet still retains all the user-facing APIs.

Do note that hadoop will already not be included in convenience binaries 
for 1.8 . This was

the motivation behind the new section on the download page.

On 25.01.2019 06:42, Jark Wu wrote:

+1 for the leaner distribution and improve the "Download" page.

On Fri, 25 Jan 2019 at 01:54, Bowen Li  wrote:


+1 for leaner distribution and a better 'download' webpage.

+1 for a full distribution if we can automate it besides supporting the
leaner one. If we support both, I'd image release managers should be able
to package two distributions with a single change of parameter instead of
manually package the full distribution. How to achieve that needs to be
evaluated and discussed, probably can be something like 'mvn clean install
-Dfull/-Dlean', I'm not sure yet.


On Wed, Jan 23, 2019 at 10:11 AM Thomas Weise  wrote:


+1 for trimming the size by default and offering the fat distribution as
alternative download


On Wed, Jan 23, 2019 at 8:35 AM Till Rohrmann 
wrote:


Ufuk's proposal (having a lean default release and a user convenience
tarball) sounds good to me. That way advanced users won't be bothered by
an
unnecessarily large release and new users can benefit from having many
useful extensions bundled in one tarball.

Cheers,
Till

On Wed, Jan 23, 2019 at 3:42 PM Ufuk Celebi  wrote:


On Wed, Jan 23, 2019 at 11:01 AM Timo Walther 

wrote:

I think what is more important than a big dist bundle is a helpful
"Downloads" page where users can easily find available filesystems,
connectors, metric repoters. Not everyone checks Maven central for
available JAR files. I just saw that we added a "Optional components"
section recently [1], we just need to make it more prominent. This is
also done for the SQL connectors and formats [2].

+1 I fully agree with the importance of the Downloads page. We
definitely need to make any optional dependencies that users need to
download easy to find.





Re: Flink Table API Sum method

2019-01-29 Thread Dawid Wysakowicz
Hi,

It depends if you are using scala or java.

In scala: table.agg('field1.sum)

In java (this should work in scala as well): table("field1.sum")

More examples you can find in the docs[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#aggregations

On 29/01/2019 15:13, Soheil Pourbafrani wrote:
> Hi,
>
> How can I use the Flink Table API SUM function? For example something
> like this:
> table.agg(sum("feild1"))



signature.asc
Description: OpenPGP digital signature


Flink Table API Sum method

2019-01-29 Thread Soheil Pourbafrani
Hi,

How can I use the Flink Table API SUM function? For example something like
this:
table.agg(sum("feild1"))


Re: connecting two streams flink

2019-01-29 Thread Selvaraj chennappan
I have  pipeline defined . pipeline does source(kafka)
,transformation,dedup and persisting to DB .
[image: image.png]

Before reaching to DB task lots of transformation is applied on the
pipeline  Therefore want to validate the record with raw json message which
is available in kafka  with the transformed record.

Hence I want to know How to do that in flink.



On Tue, Jan 29, 2019 at 6:43 PM miki haiat  wrote:

> If c1 and c2 are  listing   to the same topic they will  consume the same
> data .
> so i cant understand this
>
>>  these two streams one(c2) is fast and other(c1)
>
>
>
>
>
> On Tue, Jan 29, 2019 at 2:44 PM Selvaraj chennappan <
> selvarajchennap...@gmail.com> wrote:
>
>> Team,
>>
>> I have two kafka consumer for same topic and want to join second stream
>> to first after couple of subtasks computation in the first stream then
>> validate the record . KT - C1 ,C2
>>
>> KT - C1 - Transformation(FlatMap) - Dedup - Validate --ifvalidsave it to
>> DB
>> -C2 - Process --
>>
>> if record is invalid then save it to Error topic .
>>
>> How do I merge these two streams one(c2) is fast and other(c1) is little
>> slow (two levels of computation) ?
>> Same record is flowing from C1-Flatmap-FlatMap and other consumer C2 . I
>> have to validate that record based on the rules.
>> Please find the attached image herewith reference.
>> [image: two-stream.png]
>>
>> --
>>
>>
>>
>>
>>
>> Regards,
>> Selvaraj C
>>
>

-- 





Regards,
Selvaraj C


Re: connecting two streams flink

2019-01-29 Thread miki haiat
If c1 and c2 are  listing   to the same topic they will  consume the same
data .
so i cant understand this

>  these two streams one(c2) is fast and other(c1)





On Tue, Jan 29, 2019 at 2:44 PM Selvaraj chennappan <
selvarajchennap...@gmail.com> wrote:

> Team,
>
> I have two kafka consumer for same topic and want to join second stream to
> first after couple of subtasks computation in the first stream then
> validate the record . KT - C1 ,C2
>
> KT - C1 - Transformation(FlatMap) - Dedup - Validate --ifvalidsave it to DB
> -C2 - Process --
>
> if record is invalid then save it to Error topic .
>
> How do I merge these two streams one(c2) is fast and other(c1) is little
> slow (two levels of computation) ?
> Same record is flowing from C1-Flatmap-FlatMap and other consumer C2 . I
> have to validate that record based on the rules.
> Please find the attached image herewith reference.
> [image: two-stream.png]
>
> --
>
>
>
>
>
> Regards,
> Selvaraj C
>


Re: Forking a stream with Flink

2019-01-29 Thread Selvaraj chennappan
UseCase:- We have kafka consumer to read messages(json ) then it applies to
flatmap  for transformation based on the rules ( rules are complex ) and
convert it to pojo .
We want to verify the record(pojo) is valid by checking field by field of
that record .if record is invalid due to transformation rules  then move to
error topic otherwise send to DB.

I thought of Implementing like adding another consumer to read json
message  and compare json message attributes with transformed record
attributes .

Hence I need to join/coprocess these two streams to validate then decide
whether persist to db or sending to error topic.

Please let me know if you need more information.

On Tue, Jan 29, 2019 at 6:21 PM miki haiat  wrote:

> Im not sure if i got your question correctly, can you elaborate more on
> your use case
>


-- 





Regards,
Selvaraj C


Re: Forking a stream with Flink

2019-01-29 Thread miki haiat
Im not sure if i got your question correctly, can you elaborate more on
your use case


connecting two streams flink

2019-01-29 Thread Selvaraj chennappan
Team,

I have two kafka consumer for same topic and want to join second stream to
first after couple of subtasks computation in the first stream then
validate the record . KT - C1 ,C2

KT - C1 - Transformation(FlatMap) - Dedup - Validate --ifvalidsave it to DB
-C2 - Process --

if record is invalid then save it to Error topic .

How do I merge these two streams one(c2) is fast and other(c1) is little
slow (two levels of computation) ?
Same record is flowing from C1-Flatmap-FlatMap and other consumer C2 . I
have to validate that record based on the rules.
Please find the attached image herewith reference.
[image: two-stream.png]

-- 





Regards,
Selvaraj C


Forking a stream with Flink

2019-01-29 Thread Daniel Krenn
Hello Flink Community,

Let's say I have multiple machines I get data from. I want to process the
data of each machine seperately, but in the same way. Is it possible to
"fork" a stream by some parameter and then processing the forks
indepentently from each other, natively? Or do I need to do that in some
other way?

Regards,
Daniel


Re: Checkpoints failed with NullPointerException

2019-01-29 Thread Tzu-Li (Gordon) Tai
Hi!

Thanks for reporting this.

This looks like a bug that we fixed in Flink 1.7.1 [1].

Would you be able to try with 1.7.1 and see if the issue is still happening
for you?

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-11094

On Tue, Jan 29, 2019, 6:29 PM Averell  I tried to create a savepoint on HDFS, and got the same exception:
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 028e392d02bd229ed08f50a2da5227e2 failed.
> at
>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
> at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
> at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
> failed: Could not perform checkpoint 35 for operator Merge sourceA&sourceB
> (7/16).
> at
>
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$14(JobMaster.java:970)
> at
>
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
>
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
>
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortWithCause(PendingCheckpoint.java:452)
> at
>
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:447)
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1258)
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failUnacknowledgedPendingCheckpointsFor(CheckpointCoordinator.java:918)
> at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyExecutionChange(ExecutionGraph.java:1779)
> at
>
> org.apache.flink.runtime.executiongraph.ExecutionVertex.notifyStateTransition(ExecutionVertex.java:756)
> at
>
> org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1353)
> at
>
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1113)
> at
>
> org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:945)
> at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1576)
> at
>
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542)
> at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
> at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
>
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:2

Re: Checkpoints failed with NullPointerException

2019-01-29 Thread Averell
I tried to create a savepoint on HDFS, and got the same exception:


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
028e392d02bd229ed08f50a2da5227e2 failed.
at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
failed: Could not perform checkpoint 35 for operator Merge sourceA&sourceB
(7/16).
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$14(JobMaster.java:970)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortWithCause(PendingCheckpoint.java:452)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:447)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1258)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failUnacknowledgedPendingCheckpointsFor(CheckpointCoordinator.java:918)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyExecutionChange(ExecutionGraph.java:1779)
at
org.apache.flink.runtime.executiongraph.ExecutionVertex.notifyStateTransition(ExecutionVertex.java:756)
at
org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1353)
at
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1113)
at
org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:945)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1576)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: java.lang.Exception:
Checkpoint failed: Could not perform checkpoint 35 for operator Merge
sourceA&sourceB (7/16).
at
java.util.concurrent.CompletableFuture.encodeThrowable(Completab

Re: KeyBy is not creating different keyed streams for different keys

2019-01-29 Thread Congxian Qiu
Hi Harshith

You can replace the GenericDataObject with Tuple3 and keyBy("A", "B")
with keyBy(1, 2) then have a try.
And you can see the doc[1] for reference also.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions
Best,
Congxian


Kumar Bolar, Harshith  于2019年1月29日周二 下午12:49写道:

> Typo: lines 1, 2 and 5
>
>
>
> *From: *Harshith Kumar Bolar 
> *Date: *Tuesday, 29 January 2019 at 10:16 AM
> *To: *"user@flink.apache.org" 
> *Subject: *KeyBy is not creating different keyed streams for different
> keys
>
>
>
> Hi all,
>
>
>
> I'm reading a simple JSON string as input and keying the stream based on
> two fields A and B. But KeyBy is generating the same keyed stream for
> different values of B but for a particular combination of A and B.
>
>
>
> The input:
>
>
>
> {
>
> "A": "352580084349898",
>
> "B": "1546559127",
>
> "C": "A"
>
> }
>
>
>
> This is the core logic of my Flink code:
>
>
>
> DataStream genericDataObjectDataStream = inputStream
>
> .map(new MapFunction() {
>
> @Override
>
> public GenericDataObject map(String s) throws Exception {
>
> JSONObject jsonObject = new JSONObject(s);
>
> GenericDataObject genericDataObject = new
> GenericDataObject();
>
> genericDataObject.setA(jsonObject.getString("A"));
>
> genericDataObject.setB(jsonObject.getString("B"));
>
> genericDataObject.setC(jsonObject.getString("C"));
>
> return genericDataObject;
>
> }
>
> });
>
> DataStream testStream = genericDataObjectDataStream
>
> .keyBy("A", "B")
>
> .map(new MapFunction() {
>
> @Override
>
> public GenericDataObject map(GenericDataObject
> genericDataObject) throws Exception {
>
> return genericDataObject;
>
> }
>
> });
>
> testStream.print();
>
>
>
> GenericDataObject is a POJO with three fields A, B and C .
>
>
>
> And this is the console output for different values of field B.
>
>
>
> 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}
>
> 5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}
>
> 4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}
>
> 3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}
>
> 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}
>
>
>
> Notice lines 1, 2 and 3. Even though they have different values of B, they
> are being put in the same keyed stream (5). I must be doing something
> fundamentally wrong here.
>
>
>
> Thanks,
>
> Harshith
>
>
>


Re: AssertionError: mismatched type $5 TIMESTAMP(3)

2019-01-29 Thread Chris Miller
Thanks Timo, I didn't realise supplying Row could automatically apply 
the correct types. In this case your suggestion doesn't solve the 
problem though, I still get the exact same error. I assume that's 
because there isn't a time attribute type on the tradesByInstr table 
itself, but rather on the groupedTrades table that it joins with.


System.out.println(tradesByInstr.getSchema().toRowType()) outputs:
->  Row(InstrumentId: Integer, Name: String, ClosePrice: Double, 
TradeCount: Long, Quantity: Double, Cost: Double)


System.out.println(groupedTrades.getSchema().toRowType()) outputs:
->  Row(t_InstrumentId: Integer, t_CounterpartyId: Integer, TradeCount: 
Long, Quantity: Double, Cost: Double, LastTrade_EventTime: 
TimeIndicatorTypeInfo(rowtime))


Looking at the stack trace it seems the query optimiser is tripping up 
on the LastTrade_EventTime column, but that is required for the temporal 
table join.


Any other ideas on how I can work around this problem?

Many thanks,
Chris

-- Original Message --
From: "Timo Walther" 
To: "Chris Miller" ; "user" 
Sent: 29/01/2019 09:44:14
Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3)


Hi Chris,

the exception message is a bit misleading. The time attribute (time 
indicator) type is an internal type and should not be used by users.


The following line should solve your issue. Instead of:

DataStream> tradesByInstrStream = 
tableEnv.toRetractStream(tradesByInstr, typeInfo);


You can do

DataStream> tradesByInstrStream = 
tableEnv.toRetractStream(tradesByInstr, Row.class);


The API will automatically insert the right types for the table passed 
when using a plain `Row.class`.


I hope this helps.

Regards,
Timo



Am 25.01.19 um 20:14 schrieb Chris Miller:
I'm trying to group some data and then enrich it by joining with a 
temporal table function, however my test code (attached) is failing 
with the error shown below. Can someone please give me a clue as to 
what I'm doing wrong?


Exception in thread "main" java.lang.AssertionError: mismatched type 
$5 TIMESTAMP(3)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)

at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at 
org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
at 
org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)

at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)

at test.Test.main(Test.java:78)




Checkpoints failed with NullPointerException

2019-01-29 Thread Averell
Hi everyone,

I am getting NullPointerException when the job is creating checkpoints.
My configuration is: Flink 1.7.0 running on AWS EMR, using incremental
RockDBStateBackEnd on S3. Sinks are parquet files on S3 and ElasticSearch
(I'm not sure whether sinks are relevant to this error). There had been many
successful checkpoints before it started failing.

JobManager and TaskManagers' logs showed no issue.
Here below is the extract from the "Exception" tab in Flink GUI (I don't
know from where Flink GUI collected this): 

java.lang.Exception: Could not perform checkpoint 29 for operator Merge
sourceA&sourceB (2/16).
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:273)
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 29 for operator
Merge sourceA&sourceB  (2/16).
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
... 8 more
Caused by: java.lang.NullPointerException
at
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.snapshotMetaData(RocksIncrementalSnapshotStrategy.java:233)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.doSnapshot(RocksIncrementalSnapshotStrategy.java:152)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.snapshot(RocksDBSnapshotStrategyBase.java:128)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:496)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
... 13 more

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: AssertionError: mismatched type $5 TIMESTAMP(3)

2019-01-29 Thread Timo Walther

Hi Chris,

the exception message is a bit misleading. The time attribute (time 
indicator) type is an internal type and should not be used by users.


The following line should solve your issue. Instead of:

DataStream> tradesByInstrStream = 
tableEnv.toRetractStream(tradesByInstr, typeInfo);


You can do

DataStream> tradesByInstrStream = 
tableEnv.toRetractStream(tradesByInstr, Row.class);


The API will automatically insert the right types for the table passed 
when using a plain `Row.class`.


I hope this helps.

Regards,
Timo



Am 25.01.19 um 20:14 schrieb Chris Miller:
I'm trying to group some data and then enrich it by joining with a 
temporal table function, however my test code (attached) is failing 
with the error shown below. Can someone please give me a clue as to 
what I'm doing wrong?


Exception in thread "main" java.lang.AssertionError: mismatched type 
$5 TIMESTAMP(3)
    at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
    at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)

    at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
    at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
    at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)
    at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
    at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
    at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
    at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
    at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
    at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
    at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
    at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
    at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
    at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
    at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
    at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
    at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
    at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
    at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)
    at test.Test.main(Test.java:78) 





Re: SQL Client (Streaming + Checkpoint)

2019-01-29 Thread Timo Walther

Hi Vijay,

in general Yun is right, the SQL Client is still in an early prototyping 
phase. Some configuration features are missing.


You can track the progress of this feature here: 
https://issues.apache.org/jira/browse/FLINK-10265


It should be possible to use the global Flink configuration for now as a 
workaround:


https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#checkpointing

Regards,
Timo

Am 28.01.19 um 18:41 schrieb Yun Tang:

Hi Vijay

Unfortunately, current sql-client does not support to configure to 
enable checkpoint. Current execution properties for sql-client support 
both batch and streaming environment while batch environment dose not 
support checkpoint. I prefer current sql-client as a tool for 
prototyping, not production-ready yet.


Best
Yun Tang

*From:* Vijay Srinivasaraghavan 
*Sent:* Tuesday, January 29, 2019 0:53
*To:* User
*Subject:* SQL Client (Streaming + Checkpoint)
It looks like the SQL client does not configure enable checkpoint 
while submitting the streaming job query. Did anyone notice this 
behavior? FYI, I am using 1.6.x branch.


Regards
Vijay





Re: date format in Flink SQL

2019-01-29 Thread Timo Walther

Hi Soheil,

the functions for date/time conversion are pretty limited so far. The 
full list of supported functions can be found here [1]. If you need more 
(which is usually the case), it is easy to implement a custom function [2].


We rely on Java's java.sql.Date as a data type. You can use `SELECT DATE 
'2018-09-23'` to create literals.


I hope this helps.

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/functions.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions


Am 29.01.19 um 09:42 schrieb Soheil Pourbafrani:

Hi,

I want to convert a string in the format of 1996-8-01 to date and 
create Table from the dataset of Tuple3 at the 
end. Since I want to apply SQL queries on the date field of the table, 
for example, "date_column < 1996-8-01", which java format of date is 
supported in Flink?





date format in Flink SQL

2019-01-29 Thread Soheil Pourbafrani
Hi,

I want to convert a string in the format of 1996-8-01 to date and create
Table from the dataset of Tuple3 at the end. Since I
want to apply SQL queries on the date field of the table, for
example, "date_column < 1996-8-01", which java format of date is supported
in Flink?