Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-27 Thread Michael Fong
Hi, shashank agarwal
<https://plus.google.com/u/1/102996216504701757998?prsrc=4>


Not sure if I can answer fully your question, but after digging some code,
I am not sure if C* connector totally supports Scala case class + CQL data
mapping at the moment. I may be totally wrong, and you need to ask the
flink dev about this. However, I have some toy examples that you could
check out to see which uses CassandraScalaProductSinkBuilder + predefined
CQL query + entity. I am not using Scala case class so may not fit your
need.

The example snippet you may find @
https://github.com/mcfongtw/flink-cassandra-connector-examples/

Regards,

On Thu, Dec 28, 2017 at 1:11 PM, Michael Fong <mcfong.o...@gmail.com> wrote:

> Hi, shashank agarwal
>
>
> AFAIK, in java side, for a pojo data type, you don't need to set query
> since the CQL data mapping would take care of that whereas dealing with
> java tuples, you do need to provide a upsert query so that cassandra knows
> what to insert into the table.
> Scala tuple case is clear, same as java - providing a CQL query; however,
> I don't know what's up with Scala pojo case (class) though...
>
> Regards,
>
> Michael
>


Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-27 Thread Michael Fong
Hi, shashank agarwal


AFAIK, in java side, for a pojo data type, you don't need to set query
since the CQL data mapping would take care of that whereas dealing with
java tuples, you do need to provide a upsert query so that cassandra knows
what to insert into the table.
Scala tuple case is clear, same as java - providing a CQL query; however, I
don't know what's up with Scala pojo case (class) though...

Regards,

Michael


Re: Building scala examples

2017-09-26 Thread Michael Fong
Thanks, Nico.


I look again at flink-examples- streaming_2.10-1.4-SNAPSHOT.jar, and it
indeed contains both.

Originally I was looking at each self-contained jars as I used them as
examples to create and run my own streaming program. They only contain java
compiled class, if I am not mistaken.

Let me try to create a scala example with similar build procedure.

Thanks!


On Mon, Sep 25, 2017 at 10:41 PM, Nico Kruber <n...@data-artisans.com>
wrote:

> Hi Michael,
> from what I see, Java and Scala examples reside in different packages, e.g.
> * org.apache.flink.streaming.scala.examples.async.AsyncIOExample vs.
> * org.apache.flink.streaming.examples.async.AsyncIOExample
>
> A quick run on the Flink 1.3. branch revealed flink-examples-
> streaming_2.10-1.3-SNAPSHOT.jar containing both (which you can verify with
> your favorite archiver tool for zip files).
>
> Afaik, there is no simple switch to turn off Java or Scala examples. You
> may
> either adapt the pom.xml or create your own Project with the examples and
> programming languages you need.
>
>
> Nico
>
>
> On Saturday, 23 September 2017 12:45:04 CEST Michael Fong wrote:
> > Hi,
> >
> > I am studying how to build a scala program from flink-examples/.
> >
> > I can see there are two source folders java/ and scala/ from IntelliJ,
> and
> > for most examples, there is a copy of examples for Java and Scala.
> > Executing 'mvn clean package -Pbuild-jar' would rests in a jar file under
> > target/. I am wondering if that is a Java or Scala example that I just
> > compiled? In addition, is there a way to selectively choose Java o Scala
> > example to build with current maven settings?
> >
> > Thanks in advance,
>
>
>


Re: need instruction on how the Flink metric works

2017-09-19 Thread Michael Fong
I just did the same test as you had with SocketWindowWordCount, and the
counter showed up all right.

You should probably connect Jconsole to localhost:28781 (or whatever port
you have your JMX server listened on)

That's how I setup the env, perhaps there is other better ways to do it.

On Wed, Sep 20, 2017 at 9:15 AM, Jiewen Shao <jiewens...@gmail.com> wrote:

> Still got stuck, here are my steps (on my laptop)
>
> for example:
> Step1:
>
> public class MetricsTest extends RichMapFunction<T, T> {
>
>
> private static final long serialVersionUID = 1L;
>
> private org.apache.flink.metrics.Meter meter;
>
> private Counter counter;
>
>
> @Override
>
> public void open(Configuration config) {
>
> this.counter = getRuntimeContext()
>
> .getMetricGroup()
>
> .counter("my-counter");
>
>
>
> this.meter = getRuntimeContext()
>
> .getMetricGroup()
>
> .meter("my-meter", new DropwizardMeterWrapper(new
> com.codahale.metrics.Meter()));
>
> }
>
>
> @Override
>
> public T map(T item) throws Exception {
>
> this.counter.inc();
>
> this.meter.markEvent();
>
> return item;
>
> }
>
> }
>
>
>
>
> And I did followings in one of the Flink sample
> (SocketWindowWordCount.java):
> Step2:
>
> DataStream text = env.socketTextStream("localhost", 12345, "\n");
>
> text.map(new MetricsTest());  //<-- added this line
>
>
> Step3:
>
> mvn clean install
>
>
> step4: nc -l 12345
>
>
> step5:
>
> flink run -c [my_class_name] my.jar
>
>
> step6:  (type something under nc terminal)
>
> run jconsole, and pick the local process for this "flink run", and click
> the tab "MBeans" (I don't see my metrics other than system ones, is that
> the right place to look at?)
>
>
> and flink-conf.yaml has:
>
> # metrics
>
> metrics.reporters: jmx
>
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>
> metrics.reporter.jmx.port: 28780-28790
>
>
> and taskmanager log looks ok regarding JMX
>
>
> did I miss steps or configurations? Thanks a lot!
>
>
>
>
> On Mon, Sep 18, 2017 at 12:30 AM, Michael Fong <mcfong.o...@gmail.com>
> wrote:
>
>> Hi,
>>
>> There are several possibilities:
>> 1. Please check if reporter is set up ( guide
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#jmx-orgapacheflinkmetricsjmxjmxreporter>
>>  )
>> For example, I would make sure my local JMXReporter service is up and
>> running by checking taskmanager.log and search for the line:
>>
>> 2017-09-18 15:18:57,174 INFO  org.apache.flink.metrics.jmx.JMXReporter
>> - Started JMX server on port 28781.
>> 2017-09-18 15:18:57,175 INFO  org.apache.flink.metrics.jmx.JMXReporter
>> - Configured JMXReporter with {port:28780-28790}
>>
>> If for any reason the JMX server does not start up, your might see some
>> errors:
>>
>> 2017-09-18 15:26:04,743 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>>  - Configuring JMXReporter with {port=28781, class=org.apac
>> he.flink.metrics.jmx.JMXReporter}.
>> 2017-09-18 15:26:04,760 ERROR org.apache.flink.runtime.metrics.MetricRegistry
>>  - Could not instantiate metrics reporter jmx. Metrics migh
>> t not be exposed/reported.
>> java.lang.RuntimeException: Could not start JMX server on any configured
>> port. Ports: 28781
>> at org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.ja
>> va:126)
>> at org.apache.flink.runtime.metrics.MetricRegistry.(Metri
>> cRegistry.java:131)
>> at org.apache.flink.runtime.taskexecutor.TaskManagerServices.fr
>> omConfiguration(TaskManagerServices.java:188)
>> at org.apache.flink.runtime.taskmanager.TaskManager$.startTaskM
>> anagerComponentsAndActor(TaskManager.scala:1984)
>> at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskMan
>> ager(TaskManager.scala:1823)
>> at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>> apply$mcV$sp(TaskManager.scala:1926)
>> at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>> apply(TaskManager.scala:1904)
>> at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>> apply(TaskManager.scala:1904)
>> at scala.util.Try$.apply(Try.scala:192)
>>
>>
>> He

Re: need instruction on how the Flink metric works

2017-09-18 Thread Michael Fong
Hi,

There are several possibilities:
1. Please check if reporter is set up ( guide
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#jmx-orgapacheflinkmetricsjmxjmxreporter>
 )
For example, I would make sure my local JMXReporter service is up and
running by checking taskmanager.log and search for the line:

2017-09-18 15:18:57,174 INFO  org.apache.flink.metrics.jmx.JMXReporter
- Started JMX server on port 28781.
2017-09-18 15:18:57,175 INFO  org.apache.flink.metrics.jmx.JMXReporter
- Configured JMXReporter with {port:28780-28790}

If for any reason the JMX server does not start up, your might see some
errors:

2017-09-18 15:26:04,743 INFO
org.apache.flink.runtime.metrics.MetricRegistry   - Configuring
JMXReporter with {port=28781, class=org.apac
he.flink.metrics.jmx.JMXReporter}.
2017-09-18 15:26:04,760 ERROR
org.apache.flink.runtime.metrics.MetricRegistry   - Could not
instantiate metrics reporter jmx. Metrics migh
t not be exposed/reported.
java.lang.RuntimeException: Could not start JMX server on any configured
port. Ports: 28781
at
org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.java:126)
at
org.apache.flink.runtime.metrics.MetricRegistry.(MetricRegistry.java:131)
at
org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:188)
at
org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1984)
at
org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1823)
at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply$mcV$sp(TaskManager.scala:1926)
at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1904)
at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1904)
at scala.util.Try$.apply(Try.scala:192)


Here is my local setup for conf/flink-conf.yaml for example:
metrics.reporters: jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 28780-28790

2. You might want to try a real streaming example which could execute
continuously. If I remember correctly, when the task is completed, the
manager would seem to release the associated resource and object. In your
example, it is only processing a few strings, which would finish in matter
of milliseconds, before bringing up jconsole manually.

Hope some of these help,



On Mon, Sep 18, 2017 at 12:22 PM, Jiewen Shao <jiewens...@gmail.com> wrote:

> Thanks, When I started jconsole, it listed 
> com.apache.flink.runtime.jobmanager..:[port]
> as one of the Local Process, i was able to connect to it with insecure
> connection, but i was not able to locate the Counter metrics, I only saw
> some system metrics.
>
> On Sun, Sep 17, 2017 at 7:39 PM, Michael Fong <mcfong.o...@gmail.com>
> wrote:
>
>> Hi,
>>
>> You may enable metrics reporter to see the output of your metrics;
>> counter in your example.
>>
>> There is a brief documentation regarding to metrics and reporter setup at
>> link
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>.
>> The easiest approach, in my opinion, is to set up a JMX reporter so that
>> you may see your metrics via JConsole.
>>
>> Hope this helps.
>>
>> Regrads,
>>
>>
>> On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao <jiewens...@gmail.com>
>> wrote:
>>
>>> I'm new to flink and I have read https://ci.apache.org/pro
>>> jects/flink/flink-docs-release-1.3/monitoring/metrics.html, I am still
>>> unclear where do I read the metrics I added.
>>>
>>> for example,
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>
>>> env.setParallelism(2);
>>>
>>>
>>>
>>> List wordList = Arrays.asList("Hive", "Presto", "Impala",
>>> "Parquet","ORC","Hadoop", "Flink", "Spark", "Storm", "Tez", "Flink");
>>>
>>>
>>> DataStreamSource source = env.fromCollection(wordList);
>>>
>>> DataStream<Tuple2<String, Integer>> dataStream = env
>>> .fromCollection(wordList).map(new WordLengthCounter());
>>>
>>> dataStream.print();
>>>
>>> env.execute();
>>>
>>> }
>>>
>>>
&g

Re: need instruction on how the Flink metric works

2017-09-17 Thread Michael Fong
Hi,

You may enable metrics reporter to see the output of your metrics; counter
in your example.

There is a brief documentation regarding to metrics and reporter setup at
link
.
The easiest approach, in my opinion, is to set up a JMX reporter so that
you may see your metrics via JConsole.

Hope this helps.

Regrads,


On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao  wrote:

> I'm new to flink and I have read https://ci.apache.org/
> projects/flink/flink-docs-release-1.3/monitoring/metrics.html, I am still
> unclear where do I read the metrics I added.
>
> for example,
>
> public static void main(String[] args) throws Exception {
>
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> env.setParallelism(2);
>
>
>
> List wordList = Arrays.asList("Hive", "Presto", "Impala",
> "Parquet","ORC","Hadoop", "Flink", "Spark", "Storm", "Tez", "Flink");
>
>
> DataStreamSource source = env.fromCollection(wordList);
>
> DataStream> dataStream = env
> .fromCollection(wordList).map(new WordLengthCounter());
>
> dataStream.print();
>
> env.execute();
>
> }
>
>
> and
>
>
> public class WordLengthCounter extends RichMapFunction Tuple2> {
>
>
> private static final long serialVersionUID = 1L;
>
> private Counter counter;
>
>
> @Override
>
>   public void open(Configuration config) {
>
> this.counter = getRuntimeContext()
>
>   .getMetricGroup()
>
>   .counter("myCounter");
>
>   }
>
>
> @Override
>
> public Tuple2 map(String value) throws Exception {
>
> this.counter.inc();
>
> return new Tuple2(value, value.length());
>
> }
>
> }
>
>
> Now, where do I see the counter? Sorry for the naive question
>
> can anyone point me to any good end-to-end "hello world" example for flink
> metrics.
>


Re: Sink metric numRecordsIn drops temporarily

2017-09-17 Thread Michael Fong
Hi,

Thank you for your description. What I tried to understand is what the
counter value is at that moment of spikes. Grafana would take the average
out of a continuous data values before rendering result to UI. That is, if
the metrics value is not transmitted continuously, where at some data point
appears to be zeros, then the average value over time would be lower than
the snapshot value. I would suggest to first check what the value is by
zooming into the minimum scale in term of data retention policy set in
Graphite. (per minute, or second, depending on settings)


I actually do not have concrete answer for that counter in Flink. Perhaps
someone knows better on the semantics of this metrics would. However, there
is a possibility which we have observed similarly in other Java
application. This usually happens to a fast-growing counter, when its next
proceeding value exceeds its positive upper bound. Normally, metrics
library does not reset its value to 0. If I remember correctly,
Long.MAX_VALUE + 1 = Long.MIN_VALUE, take long data type for example.
Therefore, taking NonNegativeDerivative( delta ) results in a very high
peak in graph.

Hope this helps.

On Sun, Sep 17, 2017 at 11:02 PM, Philipp Bussche  wrote:

> Hi, thank you for your answer. So for September 11th which is shown on the
> screenshot I had the counter sitting at 26.91k where when the drop happened
> it was going down to 26.01k. This happened 3 times during that day and it
> was always going back to the same value.
> Philipp
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Sink metric numRecordsIn drops temporarily

2017-09-17 Thread Michael Fong
Missed to cc to user@flink.apache.org

Hi,
>
> Just wondering what is the value of that counter (wo/ applying 
> NonNegativeDerivative
> function) when you observe the spikes? If I remember correctly, Grafana
> is known to aggregate those values by averaging them across the time
> duration selected before rendering to the front-end. The charts show value
> across multiple days, and what values do that metric stand at minute scale?
>
> Regards,
>
> Michael
>
> On Sun, Sep 17, 2017 at 9:17 PM, Philipp Bussche <
> philipp.buss...@gmail.com> wrote:
>
>> Hi there, I witnessed an interesting behaviour on my Grafana dashboard
>> where
>> sink related metrics would show activity where there should not be any. I
>> am
>> saying this because for this particular sink activity is triggered by data
>> being ingested through a cronjob at a particular time, however the
>> dashboard
>> is saying there is activity also outside this time.
>> I had a closer look and in my graph I am using the NonNegativeDerivative
>> function (the data actually sits in Graphite) on the metric. Disabling
>> this
>> filter shows that for a short period of time the numRecordsIn counter is
>> dropping and then gets back to the previous value. This drop is then shown
>> on the graph and is looking like data activity because of the
>> NonNegativeDerivative function.
>> Why would the value of a counter temporarily decrease and then go back to
>> its previous level ?
>> Please see screenshots attached.
>>
>> Thanks
>> Philipp
>>
>> > nabble.com/file/t576/Sink_numRecordsIn_NonNegativeDerivative_.png>
>> > nabble.com/file/t576/Sink_numRecordsIn_Value_Drops.png>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Quick start guide

2017-09-03 Thread Michael Fong
Hi,

I was following the quick start guide on official documents

,
and I came cross a line that caused a bit confusion.
$ tail -f log/flink-*-jobmanager-*.out

It is said the wordcount program would print output to that output file.
However, when I run the code locally (mater branch, on IDE). I found the
output in taskmanager-*.out instead.

==> log/flink--*taskmanager*-0-out <==
ddd : 1
ccc : 1
bbb : 2
abc : 2
aba : 1
aab : 1
ddd : 1
aaa : 1
abca : 1
abac : 6


Is it an intended behavior in logic, or a typo in the document? Thanks in
advance.

Regards,