Lookup join or enrichment join against a changelog stream in Apache Flink Table API

2023-01-11 Thread Colin Williams
 

I'm interested in doing a "lookup join" or "enrichment join" against a
"changelog stream" read by "upsert-kafka". I am wondering if this is
possible against the table API. I found
https://github.com/fhueske/flink-sql-demo#enrichment-join-against-temporal-table
however when I read up on
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
it says that it does not work on changelog inputs.

Then it appears that I specifically can't perform the "enrichment join".

Is there a technique I am missing?

Is there a reason why we should not perform the join that I am missing?


Re: Parallelizing a tumbling group window

2018-01-11 Thread Colin Williams
Thanks for the reply. Unfortunately that project was unexpectedly cancelled
but for other reasons. I was happy to work on it, and hopefully gained some
insight. I have another question today unrelated towards Elasticsearch
sinks, and will ask there.

On Fri, Jan 5, 2018 at 2:52 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Colin,
>
> There are two things that come to my mind:
>
> 1) You mentioned "suspect jobs are grouping by a field of constant
> values". Does that mean that the grouping key is always constant? Flink
> parallelizes the window computation per key, i.e., there is one thread per
> key. Although it would be possible to perform pre-aggregations, this is not
> done yet. There is an effort to add support for this to the DataStream API
> [1]. The Table API will hopefully leverage this once it has been added to
> the DataStream API.
> 2) Another reason for backpressure can be non-aligned watermarks, i.e.,
> the watermarks of different partitions diverge too much from each other. In
> this case, windows cannot be finalized because everything is aligned to the
> lowest watermark.
>
> Hope this helps to clarify things.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-7561
>
> 2017-12-30 0:11 GMT+01:00 Colin Williams <colin.williams.seat...@gmail.com
> >:
>
>> Hi Timo and flink-user,
>>
>>
>> It's been a few weeks and we've made some changes to the application
>> mentioned on this email. we've also updated for flink 1.4 . We are using
>> the SQL / Table API with a tumbling window and user defined agg to generate
>> a SQL query string like:
>>
>>
>> SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)),
>> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
>> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).
>>
>>
>>
>> I've experimented with parallelism of the operators and setting the
>> environments parallelism as suggested. I've been setting parallelism values
>> of 2 or 4 to all operators except the consumer and sink.
>>
>>
>> For some jobs with large kafka source topics, under load we experience
>> back pressure and see some lag. But when trying to address via parallelism:
>> so far I've only seen very degraded performance from the increased
>> parallelism settings.
>>
>>
>> Furthermore, the suspect jobs are grouping by a field of constant values.
>> Then these jobs usually have 40,000 or so grouped records enter the
>> aggregator for each minute window.
>>
>>
>>
>> I would think that the tumbling windows would allow the job to process
>> each window in another task slot, parallelizing each window. But maybe
>> that's not happening?
>>
>>
>>
>> Can you help us to understand why parallelizing the job only has a
>> degraded impact on performance and what I can do to change this?
>>
>>
>>
>>
>> Happy New Year!
>>
>>
>>
>> Colin Williams
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <twal...@apache.org> wrote:
>>
>>> Hi Colin,
>>>
>>> unfortunately, selecting the parallelism for parts of a SQL query is not
>>> supported yet. By default, tumbling window operators use the default
>>> parallelism of the environment. Simple project and select operations have
>>> the same parallelism as the inputs they are applied on.
>>>
>>> I think the easiest solution so far is to explicilty set the parallelism
>>> of operators that are not part of the Table API and use the environment's
>>> parallelism to scale the SQL query.
>>>
>>> I hope that helps.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 12/9/17 um 3:06 AM schrieb Colin Williams:
>>>
>>> Hello,
>>>
>>> I've inherited some flink application code.
>>>
>>> We're currently creating a table using a Tumbling SQL query similar to
>>> the first example in
>>>
>>>  https://ci.apache.org/projects/flink/flink-docs-release-1.3
>>> /dev/table/sql.html#group-windows
>>>
>>> Where each generated SQL query looks something like
>>>
>>> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)),
>>> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
>>> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)
>>>
>>> We are also using a UDFAGG function in some of the queries which I think
>>> might be cleaned up and optimized a bit (using scala types and possibly not
>>> well implemented)
>>>
>>> We then turn the result table back into a datastream using
>>> toAppendStream, and eventually add a derivative stream to a sink. We've
>>> configured TimeCharacteristic to event-time processing.
>>>
>>> In some streaming scenarios everything is working fine with a
>>> parallelism of 1, but in others it appears that we can't keep up with the
>>> event source.
>>>
>>> Then we are investigating how to enable parallelism specifically on the
>>> SQL table query or aggregator.
>>>
>>> Can anyone suggest a good way to go about this? It wasn't clear from the
>>> documentation.
>>>
>>> Best,
>>>
>>> Colin Williams
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Parallelizing a tumbling group window

2017-12-29 Thread Colin Williams
Hi Timo and flink-user,


It's been a few weeks and we've made some changes to the application
mentioned on this email. we've also updated for flink 1.4 . We are using
the SQL / Table API with a tumbling window and user defined agg to generate
a SQL query string like:


SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)),
TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).



I've experimented with parallelism of the operators and setting the
environments parallelism as suggested. I've been setting parallelism values
of 2 or 4 to all operators except the consumer and sink.


For some jobs with large kafka source topics, under load we experience back
pressure and see some lag. But when trying to address via parallelism: so
far I've only seen very degraded performance from the increased parallelism
settings.


Furthermore, the suspect jobs are grouping by a field of constant values.
Then these jobs usually have 40,000 or so grouped records enter the
aggregator for each minute window.



I would think that the tumbling windows would allow the job to process each
window in another task slot, parallelizing each window. But maybe that's
not happening?



Can you help us to understand why parallelizing the job only has a degraded
impact on performance and what I can do to change this?




Happy New Year!



Colin Williams










On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <twal...@apache.org> wrote:

> Hi Colin,
>
> unfortunately, selecting the parallelism for parts of a SQL query is not
> supported yet. By default, tumbling window operators use the default
> parallelism of the environment. Simple project and select operations have
> the same parallelism as the inputs they are applied on.
>
> I think the easiest solution so far is to explicilty set the parallelism
> of operators that are not part of the Table API and use the environment's
> parallelism to scale the SQL query.
>
> I hope that helps.
>
> Regards,
> Timo
>
>
> Am 12/9/17 um 3:06 AM schrieb Colin Williams:
>
> Hello,
>
> I've inherited some flink application code.
>
> We're currently creating a table using a Tumbling SQL query similar to the
> first example in
>
>  https://ci.apache.org/projects/flink/flink-docs-release-1.
> 3/dev/table/sql.html#group-windows
>
> Where each generated SQL query looks something like
>
> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)),
> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)
>
> We are also using a UDFAGG function in some of the queries which I think
> might be cleaned up and optimized a bit (using scala types and possibly not
> well implemented)
>
> We then turn the result table back into a datastream using toAppendStream,
> and eventually add a derivative stream to a sink. We've configured
> TimeCharacteristic to event-time processing.
>
> In some streaming scenarios everything is working fine with a parallelism
> of 1, but in others it appears that we can't keep up with the event source.
>
> Then we are investigating how to enable parallelism specifically on the
> SQL table query or aggregator.
>
> Can anyone suggest a good way to go about this? It wasn't clear from the
> documentation.
>
> Best,
>
> Colin Williams
>
>
>
>
>


Re: Flink upgrade compatibility table

2017-12-20 Thread Colin Williams
Hi Kien,

Thanks for the feedback. I wasn't certain regarding compatibility between
jars. I did version bump the flink libraries and the application did start.
Just curious if the previous jar still worked without upgrading.

Regarding the savepoint table. Someone should probably add 1.4 information
for consistency.


Thanks,

Colin Williams


On Dec 20, 2017 8:16 PM, "Kien Truong" <duckientru...@gmail.com> wrote:

> Hi Colin,
>
> Did you try to rebuild the application with Flink 1.4 ? You cannot just
> take a jar build with 1.3 and run it on 1.4 cluster. Afaik, Flink doesn't
> make any guarantee about binary compatibility between major releases, so
> you always have to recompile your application code when you upgrade the
> cluster.
>
> Also, the compatibility table you mentioned is only applicable to save
> point, not application code.
>
> Best regards,
> Kien
>
> Sent from TypeApp <http://www.typeapp.com/r?b=11479>
> On Dec 21, 2017, at 09:06, Colin Williams <colin.williams.seattle@gmail.
> com> wrote:
>>
>> I recently tried to launch our application 1.3 jars against a 1.4
>> cluster. I got a java.lang.NoClassDefFoundError:
>> org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring when I
>> tried to run our 1.3 flink application against 1.4 .
>>
>> Then I googled around and didn't see a mention of 1.4 in the
>> compatibility table: https://ci.apache.org/projects/flink/flink-docs-
>> release-1.4/ops/upgrading.html#compatibility-table
>>
>> Does 1.4 break compatibility? Maybe the 1.4 docs should be updated to
>> reflect that?
>>
>> Thanks,
>>
>> Colin Williams
>>
>


Flink upgrade compatibility table

2017-12-20 Thread Colin Williams
I recently tried to launch our application 1.3 jars against a 1.4 cluster.
I got a java.lang.NoClassDefFoundError:
org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring when I
tried to run our 1.3 flink application against 1.4 .

Then I googled around and didn't see a mention of 1.4 in the compatibility
table:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html#compatibility-table

Does 1.4 break compatibility? Maybe the 1.4 docs should be updated to
reflect that?

Thanks,

Colin Williams


Re: flink jobmanager HA zookeeper leadership election - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.

2017-12-19 Thread Colin Williams
On Tue, Dec 19, 2017 at 7:29 PM, Colin Williams <
colin.williams.seat...@gmail.com> wrote:

> Hi,
>
> I've been trying to update my flink-docker jobmanager configuration for
> flink 1.4. I think the system is shutting down after a leadership election,
> but I'm not sure what the issue is. My configuration of the jobmanager
> follows
>
>
> jobmanager.rpc.address: 10.16.228.150
> jobmanager.rpc.port: 6123
> jobmanager.heap.mb: 1024
> blob.server.port: 6124
> query.server.port: 6125
>
> web.port: 8081
> web.history: 10
>
> parallelism.default: 1
>
> state.backend: rocksdb
> state.backend.rocksdb.checkpointdir: /tmp/flink/rocksdb
> state.backend.fs.checkpointdir: file:///var/lib/data/checkpoints
>
> high-availability: zookeeper
> high-availability.cluster-id: /dev
> high-availability.zookeeper.quorum: 10.16.228.190:2181
> high-availability.zookeeper.path.root: /flink-1.4
> high-availability.zookeeper.storageDir: file:///var/lib/data/recovery
> high-availability.jobmanager.port: 50010
>
> env.java.opts: -Dlog.file=/opt/flink/log/jobmanager.log
>
> I'm also attaching some debugging output which shows the shutdown. Again
> I'm not entirely sure it's caused by a leadership issue because it's not
> clear from the debug logs. Can anyone suggest changes I might make to the
> configuration to fix this? I've tried clearing the zookeeper root path in
> case it had some old session information, but that didn't seem to help.
>
> Best,
>
> Colin Williams
>

DEBUG 
org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil  - 
Using select timeout of 500
DEBUG 
org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil  - 
Epoll-bug workaround enabled = false
INFO  akka.remote.Remoting  - Remoting 
started; listening on addresses :[akka.tcp://flink@10.16.228.150:50010]
INFO  org.apache.flink.runtime.jobmanager.JobManager- Actor 
system started at akka.tcp://flink@10.16.228.150:50010
WARN  org.apache.flink.configuration.Configuration  - Config 
uses deprecated configuration key 'high-availability.zookeeper.storageDir' 
instead of proper key 'high-availability.storageDir'
INFO  org.apache.flink.runtime.blob.FileSystemBlobStore - Creating 
highly available BLOB storage directory at 
file:///var/lib/data/recovery//dev/blob
DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore - Created 
highly available BLOB storage directory at 
file:///var/lib/data/recovery//dev/blob
INFO  org.apache.flink.runtime.util.ZooKeeperUtils  - Enforcing 
default ACL for ZK connections
INFO  org.apache.flink.runtime.util.ZooKeeperUtils  - Using 
'/flink-1.4/dev' as Zookeeper namespace.
INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
  - Starting
DEBUG org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient 
 - Starting
DEBUG org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
Starting
DEBUG org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
reset
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client 
environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, 
built on 03/23/2017 10:13 GMT
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client environment:host.name=ip-10-16-228-150.us-east-2.compute.internal
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client environment:java.version=1.8.0_151
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client environment:java.vendor=Oracle Corporation
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client 
environment:java.class.path=/opt/flink/lib/flink-python_2.11-1.4.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.4.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.4.0.jar:::
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client environment:java.io.tmpdir=/tmp
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client environment:java.compiler=
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client environment:os.name=Linux
INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
Client environment:os.ar

flink jobmanager HA zookeeper leadership election - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.

2017-12-19 Thread Colin Williams
Hi,

I've been trying to update my flink-docker jobmanager configuration for
flink 1.4. I think the system is shutting down after a leadership election,
but I'm not sure what the issue is. My configuration of the jobmanager
follows


jobmanager.rpc.address: 10.16.228.150
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
blob.server.port: 6124
query.server.port: 6125

web.port: 8081
web.history: 10

parallelism.default: 1

state.backend: rocksdb
state.backend.rocksdb.checkpointdir: /tmp/flink/rocksdb
state.backend.fs.checkpointdir: file:///var/lib/data/checkpoints

high-availability: zookeeper
high-availability.cluster-id: /dev
high-availability.zookeeper.quorum: 10.16.228.190:2181
high-availability.zookeeper.path.root: /flink-1.4
high-availability.zookeeper.storageDir: file:///var/lib/data/recovery
high-availability.jobmanager.port: 50010

env.java.opts: -Dlog.file=/opt/flink/log/jobmanager.log

I'm also attaching some debugging output which shows the shutdown. Again
I'm not entirely sure it's caused by a leadership issue because it's not
clear from the debug logs. Can anyone suggest changes I might make to the
configuration to fix this? I've tried clearing the zookeeper root path in
case it had some old session information, but that didn't seem to help.

Best,

Colin Williams


out
Description: Binary data


docker-flink images and CI

2017-12-14 Thread Colin Williams
I created the following issue on here:
https://github.com/docker-flink/docker-flink/issues/29 where it was
suggested I should bring this up on the list.

1.4 has been released. Hurray!

But there isn't yet a dockerfile / image for the release. Furthermore, it
would be nice to have dockerfile / images for each RC release, so people
can incrementally test features using docker before release.

Is it possible to use CI to generate images and add Dockerfiles for each
release?


Parallelizing a tumbling group window

2017-12-08 Thread Colin Williams
Hello,

I've inherited some flink application code.

We're currently creating a table using a Tumbling SQL query similar to the
first example in

 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.
html#group-windows

Where each generated SQL query looks something like

SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)),
TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)

We are also using a UDFAGG function in some of the queries which I think
might be cleaned up and optimized a bit (using scala types and possibly not
well implemented)

We then turn the result table back into a datastream using toAppendStream,
and eventually add a derivative stream to a sink. We've configured
TimeCharacteristic to event-time processing.

In some streaming scenarios everything is working fine with a parallelism
of 1, but in others it appears that we can't keep up with the event source.

Then we are investigating how to enable parallelism specifically on the SQL
table query or aggregator.

Can anyone suggest a good way to go about this? It wasn't clear from the
documentation.

Best,

Colin Williams


Re: Docker and AWS taskmanager configuration

2017-11-21 Thread Colin Williams
Hi Patrick "unique-dns-address"  is an alias for private IP. If XXX-XX-XX-XXX
is the private IP, then ip-XXX-XX-XX-XXX.aws-region-X is the
"unique-dns-address".
We were using auto scaling groups.

What worked out with the givens above was setting docker --net=host , and
then we saw "unique-dns-address" was set for the taskmanagers akka address.
Given we are ok with 1 taskmanager container per host, this all worked out.

Thanks,

Colin Williams



On Tue, Nov 21, 2017 at 6:14 AM, Patrick Lucas <patr...@data-artisans.com>
wrote:

> Hi Colin,
>
> Is each instance's "unique-dns-address" equal to the hostname of the
> instance or is the hostname something else? If it's different from the
> hostname, you're correct in assuming you need to configure each node to
> advertise its unique-dns-address intead.
>
> Are the unique-dns-addresses aliases for public or private IPs? I.e. in
> your example of a unique-dns-address do the X's map to the private IP of
> the instance or some public IP? If I recall correctly, in AWS (at least
> within a VPC), instance's public IPs are not actually bound to the instance
> itself and are more like a NAT/DMZ address, meaning you can't actually bind
> a port to them. This might work differently in EC2-Classic.
>
> If you ensure that each node advertises a bindable, resolvable name or IP
> address—with jobmanager.rpc.address on the jobmanager and
> taskmanager.hostname on the taskmanager—then they should all be able to
> discover, address, and communicate with each other with no problems.
>
> --
> Patrick Lucas
>
> On Tue, Nov 21, 2017 at 6:44 AM, Colin Williams <
> colin.williams.seat...@gmail.com> wrote:
>
>> Hi,
>>
>> We noticed that we couldn't parallelize our flink docker containers and
>> this looks like an issue that other have experienced. In our environment we
>> were not setting any hostname in the flink configuration. This worked for
>> the single node, but it looks like the taskmanagers would have the
>> exception also similar to others:
>>
>> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
>> Partition 2ec10eeac8969a16945b3713b63c0f4f@11052a989c7921423e44653285481e23 
>> not found.
>>> at 
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>>> at 
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>>> at 
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>>> at 
>>> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>>> at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>>> at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>>> at 
>>> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>>> at akka.dispatch.OnComplete.internal(Future.scala:248)
>>> at akka.dispatch.OnComplete.internal(Future.scala:245)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>> at 
>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>> at 
>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>> at 
>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> at 
>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> at 
>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>> at 
>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> at 
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorker

Docker and AWS taskmanager configuration

2017-11-20 Thread Colin Williams
Hi,

We noticed that we couldn't parallelize our flink docker containers and
this looks like an issue that other have experienced. In our environment we
were not setting any hostname in the flink configuration. This worked for
the single node, but it looks like the taskmanagers would have the
exception also similar to others:

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition 2ec10eeac8969a16945b3713b63c0f4f@11052a989c7921423e44653285481e23
not found.
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>   at 
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>   at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>   at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>   at akka.dispatch.OnComplete.internal(Future.scala:248)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
In our AWS environment we are only running one container per EC2 instance,
and each instance has a "unique-dns-address" associated with it.
uniquie-dns-address is similar to ip-XXX-XX-XX-XXX.aws-region-X .

Then so that we don't have to do any additional DNS configuration, it would
be convenient to exploit this dns address for each taskmanager to talk to
each other.

I tested that I could reach each taskmanager from the unique-dns-address
via telnet to one of the taskmanager ports and I was able to connect. This
made me think that setting taskmanager.hostname to the address would solve
my issue.

However when I tried to set taskmanager.hostname :
unique-dns-address in flink-conf.yaml I ended up with a java.net.BindException:
Cannot assign requested address.I'm not entirely sure why this happened.

But I looked around and found some other list message that mentioned
https://doc.akka.io/docs/akka/2.4.1/additional/faq.html / RE: remote actor.

So I set *akka.remote.netty.tcp.hostname*: unique-dns-address again for
each instance. However I was not certain which taskmanager port to set
for akka.remote.netty.tcp.port
. Then I left it unset but tried again I had the same
PartitionNotFoundException

I realize this is a complicated issue which varies for each environment.
But I am asking for advice regarding other things I should try to tackle
the issue.

Furhtermore, if I'm on the right track, what taskmanager service port
should correspond to akka.remote.netty.tcp.port ?


Re: Streaming User-defined Aggregate Function gives exception when using Table API jars

2017-11-15 Thread Colin Williams
Thank you Fabian for fixing that. The highlight of my day today.


On Nov 15, 2017 1:01 AM, "Fabian Hueske" <fhue...@gmail.com> wrote:

> Hi Colin,
>
> thanks for reporting the bug. I had a look at it and it seems that the
> wrong classloader is used when compiling the code (both for the batch as
> well as the streaming queries).
> I have a fix that I need to verify.
>
> It's not necessary to open a new JIRA for that. We can cover all cases
> under FLINK-7490.
>
> Thanks, Fabian
>
> 2017-11-15 5:32 GMT+01:00 Colin Williams <colin.williams.seat...@gmail.com
> >:
>
>> From the documentation there is a note which instructs not to include the
>> flink-table dependency into the project. However when I put the flink-table
>> dependency on the cluster the User-defined Aggregate Function gives an
>> Exception.
>>
>> When I do include the flink-table into the dependencies, the project runs
>> just fine. However I'd expect that there will then be garbage collection
>> issues.
>>
>> This seems similar to https://issues.apache.org/jira/browse/FLINK-7490,
>> where I made a comment. I believe the issue is likely related to the
>> classloading as suggested, but the related classes are different (Batch vs
>> Stream).
>>
>> Should another bug report be filed?
>>
>> Also that bug report hasn't really had any activity and it's been a few
>> months.
>>
>> Best Regards,
>>
>> Colin Williams
>>
>>
>> java.io.IOException: Exception while applying AggregateFunction in
>> aggregating state
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.
>> add(HeapAggregatingState.java:91)
>> at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.processElement(WindowOperator.java:442)
>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:206)
>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:69)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:263)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>> program cannot be compiled. This is a bug. Please file an issue.
>> at org.apache.flink.table.codegen.Compiler$class.compile(
>> Compiler.scala:36)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.compile(AggregateAggFunction.scala:33)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.initFunction(AggregateAggFunction.scala:72)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.createAccumulator(AggregateAggFunction.scala:41)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.createAccumulator(AggregateAggFunction.scala:33)
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState$Agg
>> regateTransformation.apply(HeapAggregatingState.java:115)
>> at org.apache.flink.runtime.state.heap.NestedMapsStateTable.tra
>> nsform(NestedMapsStateTable.java:298)
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.
>> add(HeapAggregatingState.java:89)
>> ... 6 more
>> Caused by: org.codehaus.commons.compiler.CompileException: Line 6,
>> Column 14: Cannot determine simple type name "com"
>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6416)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6177)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6190)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6190)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6190)
>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156)
>> at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(Uni
>> tCompiler.java:6064)
>> at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(Uni
>> tCompiler.java:6059)
>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754)
>> at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059)
>> at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052)
>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753)
>> at org.

Streaming User-defined Aggregate Function gives exception when using Table API jars

2017-11-14 Thread Colin Williams
>From the documentation there is a note which instructs not to include the
flink-table dependency into the project. However when I put the flink-table
dependency on the cluster the User-defined Aggregate Function gives an
Exception.

When I do include the flink-table into the dependencies, the project runs
just fine. However I'd expect that there will then be garbage collection
issues.

This seems similar to https://issues.apache.org/jira/browse/FLINK-7490,
where I made a comment. I believe the issue is likely related to the
classloading as suggested, but the related classes are different (Batch vs
Stream).

Should another bug report be filed?

Also that bug report hasn't really had any activity and it's been a few
months.

Best Regards,

Colin Williams


java.io.IOException: Exception while applying AggregateFunction in
aggregating state
at
org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:91)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table
program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at
org.apache.flink.table.runtime.aggregate.AggregateAggFunction.compile(AggregateAggFunction.scala:33)
at
org.apache.flink.table.runtime.aggregate.AggregateAggFunction.initFunction(AggregateAggFunction.scala:72)
at
org.apache.flink.table.runtime.aggregate.AggregateAggFunction.createAccumulator(AggregateAggFunction.scala:41)
at
org.apache.flink.table.runtime.aggregate.AggregateAggFunction.createAccumulator(AggregateAggFunction.scala:33)
at
org.apache.flink.runtime.state.heap.HeapAggregatingState$AggregateTransformation.apply(HeapAggregatingState.java:115)
at
org.apache.flink.runtime.state.heap.NestedMapsStateTable.transform(NestedMapsStateTable.java:298)
at
org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:89)
... 6 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column
14: Cannot determine simple type name "com"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156)
at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064)
at
org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754)
at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059)
at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844)
at org.codehaus.janino.IClass$IField.getDescriptor(IClass.java:1299)
at org.codehaus.janino.UnitCompiler.getfield(UnitCompiler.java:11439)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4118)
at org.codehaus.janino.UnitCompiler.access$6800(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(UnitCompiler.java:4053)
at
org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(UnitCompiler.java:4048)
at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136)
at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4048)
at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4044)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4109)
at org.codehaus.janino.UnitCompiler.access$6600(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$12$1.visitAmbiguousName(UnitCompiler.java:4051)
at
org.codehaus.janino.UnitCompiler$12$1.visitAmbiguousName(UnitCompiler.java:4048)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050)
at org.codehaus.janino

Re: Writing an Integration test for flink-metrics

2017-10-23 Thread Colin Williams
Thanks for the help. I ended up creating a custom metric reporter and
accessing it's fields in an integration test. However I do think that the
test that Martin checked in is another good way to test. I opened
https://issues.apache.org/jira/browse/FLINK-7907 regarding the missing
Scala examples in the metrics documentation. I can take a stab at that on
the weekend if somebody doesn't beat me to it.

On Fri, Oct 13, 2017 at 6:09 AM, Martin Eden <martineden...@gmail.com>
wrote:

> Hi,
> Not merged in yet but this is an example pr that is mocking metrics and
> checking they are properly updated:
> https://github.com/apache/flink/pull/4725
>
>
> On Fri, Oct 13, 2017 at 1:49 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> I think we could add this functionality to the (operator) test harnesses.
>> I.e. add a mock MetricGroup thingy in there that you can query to check the
>> state of metrics.
>>
>>
>> On 13. Oct 2017, at 13:50, Chesnay Schepler <ches...@apache.org> wrote:
>>
>> I meant that you could unit-test the behavior of the function in
>> isolation. You could create a dummy metric group that
>> verifies that the correct counters are being registered (based on names i
>> guess), as well as provide access to them.
>> Mock some input and observe whether the counter value is being modified.
>>
>> Whether this is a viable option depends a bit on the complexity of the
>> function of course, that is how much how mocking
>> you would have to do.
>>
>> On 13.10.2017 11:18, Piotr Nowojski wrote:
>>
>> For testing Link applications in general you can read
>> https://ci.apache.org/projects/flink/flink-docs-release
>> -1.4/dev/stream/testing.html
>>
>> However as we said before, testing metrics would require using custom or
>> a imx reporter.
>>
>> Yes, please report this bug in Jira.
>>
>> Thanks, Piotrek
>>
>> On 13 Oct 2017, at 04:31, Colin Williams <colin.williams.seattle@gmail.
>> com> wrote:
>>
>> Team wants an integration test, I'm not sure what unit test you had in
>> mind. Actually feel that I've been trying to avoid the reporter method but
>> that would be more end to end.
>>
>> The documentation for metrics and Scala are missing with the exception of
>> Gauge: https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> monitoring/metrics.html . Should I file a issue against that?
>>
>> Then it leaves you guessing a little bit how to implement Counters. One
>> approach tried was using objects
>>
>> object PointFilter extends RichMapFunction[...
>>
>>   @transient lazy val someCounter = 
>> getRuntimeContext.getMetricGroup.counter(...)
>>
>>
>> This allowed access to the counter before and after execution . However
>> between the unit tests the Counter kept its value also and that's a no for
>> the test. Think that might be an issue with ScalaTest.
>>
>> I've tried to get at the counter from some other directions like trying
>> to find a way to inject a reporter to get it's state. But don't see a way
>> to do it. So probably the best thing to do is fire up something to collect
>> the metrics from the reporter.
>>
>> On Thu, Oct 12, 2017 at 5:29 AM, Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> Well damn, i should've read the second part of the initial mail.
>>>
>>> I'm wondering though, could you not unit-test this behavior?
>>>
>>>
>>> On 12.10.2017 14:25, Chesnay Schepler wrote:
>>>
>>>> You could also write a custom reporter that opens a socket or similar
>>>> for communication purposes.
>>>>
>>>> You can then either query it for the metrics, or even just trigger the
>>>> verification in the reporter,
>>>> and fail with an error if the reporter returns an error.
>>>>
>>>> On 12.10.2017 14:02, Piotr Nowojski wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Doing as you proposed using JMXReporter (or custom reporter) should
>>>>> work. I think there is no easier way to do this at the moment.
>>>>>
>>>>> Piotrek
>>>>>
>>>>> On 12 Oct 2017, at 04:58, Colin Williams <
>>>>>> colin.williams.seat...@gmail.com> wrote:
>>>>>>
>>>>>> I have a RichMapFunction and I'd like to ensure Meter fields are
>>>>>> properly incremented. I've been trying to think of the best way to do 
>>>>>> this.
>>>>>> Currently I think that I'd need to either implement my own reporter (or 
>>>>>> use
>>>>>> JMX) and write to a socket, create a listener and wait for the reporter 
>>>>>> to
>>>>>> send the message.
>>>>>>
>>>>>> Is this a good approach for writing the test, or should I be
>>>>>> considering something else?
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>>
>>
>


Re: Writing an Integration test for flink-metrics

2017-10-12 Thread Colin Williams
Team wants an integration test, I'm not sure what unit test you had in
mind. Actually feel that I've been trying to avoid the reporter method but
that would be more end to end.

The documentation for metrics and Scala are missing with the exception of
Gauge:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html
. Should I file a issue against that?

Then it leaves you guessing a little bit how to implement Counters. One
approach tried was using objects

object PointFilter extends RichMapFunction[...

  @transient lazy val someCounter =
getRuntimeContext.getMetricGroup.counter(...)


This allowed access to the counter before and after execution . However
between the unit tests the Counter kept its value also and that's a no for
the test. Think that might be an issue with ScalaTest.

I've tried to get at the counter from some other directions like trying to
find a way to inject a reporter to get it's state. But don't see a way to
do it. So probably the best thing to do is fire up something to collect the
metrics from the reporter.

On Thu, Oct 12, 2017 at 5:29 AM, Chesnay Schepler <ches...@apache.org>
wrote:

> Well damn, i should've read the second part of the initial mail.
>
> I'm wondering though, could you not unit-test this behavior?
>
>
> On 12.10.2017 14:25, Chesnay Schepler wrote:
>
>> You could also write a custom reporter that opens a socket or similar for
>> communication purposes.
>>
>> You can then either query it for the metrics, or even just trigger the
>> verification in the reporter,
>> and fail with an error if the reporter returns an error.
>>
>> On 12.10.2017 14:02, Piotr Nowojski wrote:
>>
>>> Hi,
>>>
>>> Doing as you proposed using JMXReporter (or custom reporter) should
>>> work. I think there is no easier way to do this at the moment.
>>>
>>> Piotrek
>>>
>>> On 12 Oct 2017, at 04:58, Colin Williams <colin.williams.seattle@gmail.
>>>> com> wrote:
>>>>
>>>> I have a RichMapFunction and I'd like to ensure Meter fields are
>>>> properly incremented. I've been trying to think of the best way to do this.
>>>> Currently I think that I'd need to either implement my own reporter (or use
>>>> JMX) and write to a socket, create a listener and wait for the reporter to
>>>> send the message.
>>>>
>>>> Is this a good approach for writing the test, or should I be
>>>> considering something else?
>>>>
>>>
>>>
>>
>>
>


Writing an Integration test for flink-metrics

2017-10-11 Thread Colin Williams
I have a RichMapFunction and I'd like to ensure Meter fields are properly
incremented. I've been trying to think of the best way to do this.
Currently I think that I'd need to either implement my own reporter (or use
JMX) and write to a socket, create a listener and wait for the reporter to
send the message.

Is this a good approach for writing the test, or should I be considering
something else?


Re: RichMapFunction parameters in the Streaming API

2017-10-11 Thread Colin Williams
Thanks for the detailed explanation regarding the reasoning behind not
using opens' configuration parameters!

On Wed, Oct 11, 2017 at 1:46 AM, Chesnay Schepler <ches...@apache.org>
wrote:

> The Configuration parameter in open() is a relic of the previous java API
> where operators were instantiated generically.
>
> Nowadays, this is no longer the case as they are serialized instead, which
> simplifies the passing of parameters as you can
> simply store them in a field of your UDF.
>
> The configuration object passed to open() in case of the streaming API is
> always empty, and we don't plan
> to implement it since it provides little value due to the above.
>
> As such, we suggest to pass either the parameter tool, configuration
> instance or specific parameters through the constructor of user-defined
> functions and store them in a field. This applies both to the batch and
> streaming API.
>
> Personally i would stay away from the global configuration option as it is
> more brittle than the constructor approach, which makes
> it explicit that this function requires these parameters.
>
>
> On 11.10.2017 00:36, Colin Williams wrote:
>
> I was looking for withParameters(config) in the Streaming API today. I
> stumbled across the following thread.
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> com/withParameters-for-Streaming-API-td9332.html#a9333
>
> It appears that some of the StreamingAPI developers are in favor of
> removing the parameters from RichMapFunctions' open. However the best
> practices article
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_
> practices.html#using-the-parameters-in-your-flink-program
>
> Show examples of using both global configuration (where parameters are
> available from open) and withParameters(config) (which doesn't work from
> the Streaming API)
>
> I'm trying to make a decision regarding using global parameters with my
> Flink Streaming jobs.
>
> Is using the global configuration a good idea for parameters in the
> Streaming API or is this best practice just suggested for the Batch API?
>
> Is there a reason for the opinion of removing the configuration parameters
> from open?
>
>
>
>
>


RichMapFunction parameters in the Streaming API

2017-10-10 Thread Colin Williams
I was looking for withParameters(config) in the Streaming API today. I
stumbled across the following thread.

http://apache-flink-mailing-list-archive.1008284.n3.
nabble.com/withParameters-for-Streaming-API-td9332.html#a9333

It appears that some of the StreamingAPI developers are in favor of
removing the parameters from RichMapFunctions' open. However the best
practices article

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program

Show examples of using both global configuration (where parameters are
available from open) and withParameters(config) (which doesn't work from
the Streaming API)

I'm trying to make a decision regarding using global parameters with my
Flink Streaming jobs.

Is using the global configuration a good idea for parameters in the
Streaming API or is this best practice just suggested for the Batch API?

Is there a reason for the opinion of removing the configuration parameters
from open?


Re: serialization error when using multiple metrics counters

2017-10-09 Thread Colin Williams
Thanks everyone, and thank you very much Seth! Adding @transient to the
lazy vals is what I needed.

On Mon, Oct 9, 2017 at 1:34 PM, Seth Wiesman <swies...@mediamath.com> wrote:

> A scala class contains a single lazy val it is implemented using a boolean
> flag to track if the field has been evaluated. When a class contains,
> multiple lazy val’s it is implemented as a bit mask shared amongst the
> variables. This can lead to inconsistencies as to whether serialization
> forces evaluation of the field, in general lazy val’s should always be
> marked @transient for expected behavior.
>
>
>
> Seth
>
>
>
> *From: *Stephan Ewen <se...@apache.org>
> *Date: *Monday, October 9, 2017 at 2:44 PM
> *To: *Kostas Kloudas <k.klou...@data-artisans.com>
> *Cc: *Colin Williams <colin.williams.seat...@gmail.com>, user <
> user@flink.apache.org>
> *Subject: *Re: serialization error when using multiple metrics counters
>
>
>
> Interesting, is there a quirk in Scala that using multiple lazy variables
> results possibly in eager initialization of some?
>
>
>
> On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
> Hi Colin,
>
>
>
> Are you initializing your counters from within the open() method of you
> rich function?
>
> In other words, are you calling
>
>
>
> counter = getRuntimeContext.getMetricGroup.counter(“my counter”)
>
>
>
> from within the open().
>
>
>
> The counter interface is not serializable. So if you instantiate the
> counters outside the open(),
>
> when Flink tries to ship your code to the cluster, it cannot so you get
> the exception.
>
>
>
> You can have a look at the docs for an example:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/
> metrics.html
>
>
>
> Thanks,
>
> Kostas
>
>
>
> On Oct 7, 2017, at 11:34 PM, Colin Williams <colin.williams.seattle@gmail.
> com> wrote:
>
>
>
> I've created a RichMapFunction in scala with multiple counters like:
>
>
>
>lazy val successCounter = getRuntimeContext.getMetricGroup.counter("
> successfulParse")
>
>lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("
> failedParse")
>
>lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("
> errorParse")
>
>
>
> which I increment in the map function. While testing I noticed that I have
> no issues with using a single counter. However with multiple counters I get
> a serialization error using more than one counter.
>
>
>
> Does anyone know how I can use multiple counters from my RichMapFunction,
> or what I'm doing wrong?
>
>
>
> [info]   org.apache.flink.api.common.InvalidProgramException: The
> implementation of the RichMapFunction is not serializable. The object
> probably contains or references non serializable fields.
>
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:100)
>
> [info]   at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.
> clean(DataStream.java:183)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
> DataStream.java:527)
>
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(
> DataStream.scala:581)
>
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
> ParsedResultUnwrapperTest.scala:27)
>
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
> ParsedResultUnwrapperTest.scala:23)
>
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
>
> [info]   ...
>
> [info]   Cause: java.io.NotSerializableException:
> org.apache.flink.metrics.SimpleCounter
>
> [info]   at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
>
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>
> [info]   at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>
> [info]   at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>
> [info]   at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.j

serialization error when using multiple metrics counters

2017-10-07 Thread Colin Williams
I've created a RichMapFunction in scala with multiple counters like:

   lazy val successCounter = getRuntimeContext.
getMetricGroup.counter("successfulParse")
   lazy val failedCounter = getRuntimeContext.
getMetricGroup.counter("failedParse")
   lazy val errorCounter = getRuntimeContext.
getMetricGroup.counter("errorParse")

which I increment in the map function. While testing I noticed that I have
no issues with using a single counter. However with multiple counters I get
a serialization error using more than one counter.

Does anyone know how I can use multiple counters from my RichMapFunction,
or what I'm doing wrong?

[info]   org.apache.flink.api.common.InvalidProgramException: The
implementation of the RichMapFunction is not serializable. The object
probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(
DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
ParsedResultUnwrapperTest.scala:27)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
ParsedResultUnwrapperTest.scala:23)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.
SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(
ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(
ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(
ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a
Error -> ParseResult[LineProtocol] *** FAILED ***
[info]   org.apache.flink.api.common.InvalidProgramException: The
implementation of the RichMapFunction is not serializable. The object
probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(
DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(
ParsedResultUnwrapperTest.scala:37)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(
ParsedResultUnwrapperTest.scala:32)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.
SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(
ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(
ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(
ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   ...