Re: does the flink sink only support bio?

2018-01-08 Thread Tony Wei
Hi Stefan,

Your reply really helps me a lot. Thank you.

2018-01-08 19:38 GMT+08:00 Stefan Richter :

> Hi,
>
> 1.  If `snapshotState` failed at the first checkpoint, does it mean there
> is no state and no transaction can be aborted by default?
>
>
> This is a general problem and not only limited to the first checkpoint.
> Whenever you open a transaction, there is no guaranteed way to store it in
> persistent state to abort it in case of failure. In theory, your job can
> crash at any point after you just opened a transaction. So in the end I
> guess you must rely on something like e.g. timeout based mechanism. You can
> do some _best effort_ to proactively cancel uncommitted transactions
> through methods like states, listing them in files, or having a fixed pool
> of transaction ids and iterate them all for cancellation on a restart.
>
> 2. I saw FlinkKafkaProducer011 has a transaction id pool, which has
> multiple ids to be reused by producer, and it aborts all ids in this pool
> in the `initializeState`. Is this pool designed for the situation in the
> first problem or something I haven't noticed?
>
>
> This implementation is very specific for KafkaProducer and is not
> necessarily a good blueprint for what you are planning. In particular, in
> this case there is a fixed and limited universe of all potential
> transaction ids that a task can potentially (re)use, so after a restart
> without state we can simply iterate all possible transaction ids and issue
> a cancel for all of them. In general, you don’t always know all possible
> transaction ids in a way that allows you to opportunistically cancel all
> potential orphaned transactions.
>
> 2018-01-04 22:15 GMT+08:00 Stefan Richter :
>
>> Yes, that is how it works.
>>
>> > Am 04.01.2018 um 14:47 schrieb Jinhua Luo :
>> >
>> > The TwoPhaseCommitSinkFunction seems to record the transaction status
>> > in the state just like what I imagine above, correct?
>> > and if the progress fails before commit, in the later restart, the
>> > commit would be triggered again, correct? So the commit would not be
>> > forgotten, correct?
>> >
>> > 2018-01-03 22:54 GMT+08:00 Stefan Richter > >:
>> >> I think a mix of async UPDATES and exactly-once all this might be
>> tricky,
>> >> and the typical use case for async IO is more about reads. So let’s
>> take a
>> >> step back: what would you like to achieve with this? Do you want a
>> >> read-modify-update (e.g. a map function that queries and updates a DB)
>> or
>> >> just updates (like a sink based that goes against a DB). From the
>> previous
>> >> question, I assume the second case applies, in which case I wonder why
>> you
>> >> even need to be async for a sink? I think a much better approach could
>> be
>> >> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
>> >> batching to lower update costs.
>> >>
>> >> On top of the TwoPhaseCommitSinkFunction, you could implement
>> transactions
>> >> against your DB, similar to e.g. this example with Postgres:
>> >> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transa
>> ction-manager-that-works-with-postgresql/
>> >> .
>> >>
>> >> Does this help or do you really need async read-modify-update?
>> >>
>> >> Best,
>> >> Stefan
>> >>
>> >>
>> >> Am 03.01.2018 um 15:08 schrieb Jinhua Luo :
>> >>
>> >> No, I mean how to implement exactly-once db commit (given our async io
>> >> target is mysql), not the state used by flink.
>> >> As mentioned in previous mail, if I commit db in
>> >> notifyCheckpointComplete, we have a risk to lost data (lost commit,
>> >> and flink restart would not trigger notifyCheckpointComplete for the
>> >> last checkpoint again).
>> >> On the other hand, if I update and commit per record, the sql/stored
>> >> procedure have to handle duplicate updates at failure restart.
>> >>
>> >> So, when or where to commit so that we could get exactly-once db
>> ingress.
>> >>
>> >> 2018-01-03 21:57 GMT+08:00 Stefan Richter > >:
>> >>
>> >>
>> >> Hi,
>> >>
>> >>
>> >> Then how to implement exactly-once async io? That is, neither missing
>> >> data or duplicating data.
>> >>
>> >>
>> >> From the docs about async IO here
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/stream/asyncio.html
>> >> :
>> >>
>> >> "Fault Tolerance Guarantees:
>> >> The asynchronous I/O operator offers full exactly-once fault tolerance
>> >> guarantees. It stores the records for in-flight asynchronous requests
>> in
>> >> checkpoints and restores/re-triggers the requests when recovering from
>> a
>> >> failure.“
>> >>
>> >> So it is already handled by Flink in a way that supports exactly-once.
>> >>
>> >> Is there some way to index data by checkpoint id and records which
>> >> checkpoints already commit to db? But that means we need MapState,
>> >> right?
>> >>
>> >>
>> >> The 

Flink 1.4.0 Mapr libs issues

2018-01-08 Thread ani.desh1512
*Background:* We have a setup of Flink 1.3.2 along with a secure MAPR
(v5.2.1) cluster (Flink is running on mapr client nodes). We run this flink
cluster via flink-jobmanager.sh foreground and flink-taskmanager.sh
foreground command via Marathon. We want to upgrade to Flink 1.4.0.

Since, we require Mapr libraries, heres how I built flink 1.4.0 from source
(maven version 3.5.0)
1. Cloned the flink repo
2. Checked out the release-1.4.0 tag
3. /mvn clean install -DskipTests -Pvendor-repos,mapr
-Dhadoop.version=2.7.0-mapr-1703 -Dzookeeper.version=3.4.5-mapr-1604/
4. cd flink-dist
5. /mvn clean install/

Now, we have a java flink application that writes to MaprDB (via the hbase
api). This jar runs without any error on Flink 1.3.2.
Now we changed the flink version in the jar to 1.4.0 and the tried running
it on the newly created Flink 1.4.0 cluster, but we get the following error:

/java.lang.UnsatisfiedLinkError:
com.mapr.security.JNISecurity.SetParsingDone()V
at com.mapr.security.JNISecurity.SetParsingDone(Native Method)
at
com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:231)
at
com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.(CLDBRpcCommonUtils.java:73)
at
com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.(CLDBRpcCommonUtils.java:63)
at
org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:69)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2159)
at
org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2374)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2591)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2543)
at 
org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2456)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:994)
at 
org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1044)
at 
org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:1445)
at
org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:69)
at
org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:83)
at
org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:98)
at com.kabbage.maprdb.HBaseUtils.(HBaseUtils.java:17)
at com.kabbage.maprdb.HBaseSink.open(HBaseSink.java:18)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)/

We have set the FLINK_CLASSPATH variable to point to our Mapr libs. 
I saw in the Flink 1.4.0 release notes that there are changes to dynamic
class loading of user code. So my question is, what extra and different
steps that i need t take to make Flink work with MapR libraries again?

Thanks,
Aniket




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-08 Thread Kyle Hamlin
When I change the path from an s3 path to a local path I get the following
error:

Cluster configuration: Standalone cluster with JobManager at localhost/
127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: 5403f2bf4a71abf745bd1ed93c8feb25. Waiting for
job completion.
Connected to JobManager at
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
with leader session id ----.
01/08/2018 16:14:59 Job execution switched to status RUNNING.
01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to FAILED
java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink.
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI:
hdfs://localhost:12345/
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException

On Mon, Jan 8, 2018 at 9:21 PM Kyle Hamlin  wrote:

> Here is the task manager log:
>
> 2018-01-08 16:16:13,406 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Received task Source:
> Kafka -> Sink: S3 (1/1)
> 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched
> from CREATED to DEPLOYING.
> 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task -
> Creating FileSystem stream leak safety net for task Source: Kafka -> Sink:
> S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING]
> 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task -
> Loading JAR files for task Source: Kafka -> Sink: S3 (1/1)
> (bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING].
> 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.blob.BlobClient -
> Downloading
> b4a2b46051079212bdab65e50ee5ab03/p-bfab322ceec8791627d2191ca955ba74f4ec4dc3-d2d320b15315ce3b67038f336209bb2c
> from localhost/127.0.0.1:49710
> 2018-01-08 16:16:13,595 INFO org.apache.flink.runtime.taskmanager.Task -
> Registering task at network: Source: Kafka -> Sink: S3 (1/1)
> (bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING].
> 2018-01-08 16:16:13,595 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched
> from DEPLOYING to RUNNING.
> 2018-01-08 16:16:13,596 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask - Using user-defined
> state backend: File State Backend @ s3://my-bucket/checkpoints.
> 2018-01-08 16:16:13,631 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched
> from RUNNING to FAILED.
>
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler
> at
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:363)
> at
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:542)
> at
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639)
> at
> 

Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-08 Thread Kyle Hamlin
Here is the task manager log:

2018-01-08 16:16:13,406 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Received task Source:
Kafka -> Sink: S3 (1/1)
2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task -
Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched
from CREATED to DEPLOYING.
2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task -
Creating FileSystem stream leak safety net for task Source: Kafka -> Sink:
S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING]
2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task -
Loading JAR files for task Source: Kafka -> Sink: S3 (1/1)
(bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING].
2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.blob.BlobClient -
Downloading
b4a2b46051079212bdab65e50ee5ab03/p-bfab322ceec8791627d2191ca955ba74f4ec4dc3-d2d320b15315ce3b67038f336209bb2c
from localhost/127.0.0.1:49710
2018-01-08 16:16:13,595 INFO org.apache.flink.runtime.taskmanager.Task -
Registering task at network: Source: Kafka -> Sink: S3 (1/1)
(bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING].
2018-01-08 16:16:13,595 INFO org.apache.flink.runtime.taskmanager.Task -
Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched
from DEPLOYING to RUNNING.
2018-01-08 16:16:13,596 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - Using user-defined
state backend: File State Backend @ s3://my-bucket/checkpoints.
2018-01-08 16:16:13,631 INFO org.apache.flink.runtime.taskmanager.Task -
Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched
from RUNNING to FAILED.
java.lang.NoClassDefFoundError: Could not initialize class
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:363)
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:542)
at
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639)
at
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212)
at
org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:99)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
2018-01-08 16:16:13,631 INFO org.apache.flink.runtime.taskmanager.Task -
Freeing task resources for Source: Kafka -> Sink: S3 (1/1)
(bc932736c6526eb1bd41f6aaa73b2997).
2018-01-08 16:16:13,631 INFO org.apache.flink.runtime.taskmanager.Task -
Ensuring all FileSystem streams are closed for task Source: Kafka -> Sink:
S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) [FAILED]
2018-01-08 16:16:13,631 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and
sending final execution state FAILED to JobManager for task Source: Kafka
-> Sink: S3 (bc932736c6526eb1bd41f6aaa73b2997)
2018-01-08 16:16:13,635 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Discarding the results
produced by task execution bc932736c6526eb1bd41f6aaa73b2997
2018-01-08 16:16:23,641 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Received task Source:
Kafka -> Sink: S3 (1/1)
2018-01-08 16:16:23,641 INFO org.apache.flink.runtime.taskmanager.Task -
Source: Kafka -> Sink: S3 (1/1) (235933729cf5dffdcba6904188c2ec8d) switched
from CREATED to DEPLOYING.
2018-01-08 16:16:23,641 INFO org.apache.flink.runtime.taskmanager.Task -
Creating FileSystem stream leak safety net for task Source: Kafka -> Sink:
S3 (1/1) (235933729cf5dffdcba6904188c2ec8d) [DEPLOYING]
2018-01-08 16:16:23,641 INFO org.apache.flink.runtime.taskmanager.Task -
Loading JAR files for task Source: Kafka -> Sink: S3 (1/1)
(235933729cf5dffdcba6904188c2ec8d) [DEPLOYING].
2018-01-08 16:16:23,643 INFO org.apache.flink.runtime.taskmanager.Task -
Registering task at network: Source: Kafka -> Sink: S3 (1/1)
(235933729cf5dffdcba6904188c2ec8d) [DEPLOYING].

Re: Back-pressure Status shows OK but records are backed up in kafka

2018-01-08 Thread Jins George
Thank You Ufuk & Shannon. Since my kafka consumer is 
UnboundedKafkaSource from BEAM, not sure if  records-lag-max metrics is 
exposed. Let me research further.


Thanks,
Jins George
On 01/08/2018 10:11 AM, Shannon Carey wrote:

Right, backpressure only measures backpressure on the inside of the Flink job. 
Ie. between Flink tasks.

Therefore, it’s up to you to monitor whether your Flink job is “keeping up” 
with the source stream. If you’re using Kafka, there’s a metric that the 
consumer library makes available. For example, for one of our jobs, in Graphite 
we have a metric that matches:

aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max,
 18, 19)

The “$Job” is a variable which allows you to select the job. You can see that I 
have wildcards on other elements of the path, for example the TaskManager id, 
the operator name, the Task index, etc. Your metric is probably rooted 
somewhere else, but the thing you’re looking for is under 
operator.*.*.KafkaConsumer.records-lag-max.

Flink manages its offsets itself, rather than acting like a “normal” consumer 
which commits offsets to Kafka. However, in the docs I see that 
“setCommitOffsetsOnCheckpoints()” is enabled by default.  So, theoretically you 
can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor 
or https://github.com/linkedin/Burrow etc. which polls Kafka itself and 
produces metrics about consumer lag. However, for some reason, I don’t see our 
Flink consumer metrics showing up in our lag monitoring tool or in the Kafka 
command-line tools, so I’m not sure what’s going on there. Maybe it’s because 
Flink doesn’t show up as a consumer group? At first I thought that it might be 
because we’re not setting the “group.id” property, but as it turns out we are 
indeed setting it. In any case, we have to use the job’s metrics, and monitor 
that the job is up, rather than monitoring the offset in Kafka itself.

-Shannon

On 1/8/18, 1:52 AM, "Ufuk Celebi"  wrote:

 Hey Jins,
 
 our current back pressure tracking mechanism does not work with Kafka

 sources. To gather back pressure indicators we sample the main task
 thread of a subtask. For most tasks, this is the thread that emits
 records downstream (e.g. if you have a map function) and everything
 works as expected. In case of the Kafka source though there is a
 separate thread that consumes from Kafka and emits the records.
 Therefore we sample the "wrong" thread and don't observe any
 indicators for back pressure. :-( Unfortunately, this was not taking
 into account when back pressure sampling was implemented.
 
 There is this old issue to track this:

 https://issues.apache.org/jira/browse/FLINK-3456
 
 I'm not aware of any other way to track this situation. Maybe others

 can chime in here...
 
 – Ufuk
 
 
 On Mon, Jan 8, 2018 at 8:16 AM, Jins George  wrote:

 > I have a Beam Pipeline consuming records from Kafka doing some
 > transformations and writing it to Hbase. I faced an issue in which 
records
 > were writing to Hbase at a slower rate than the incoming messages to 
Kafka
 > due to a temporary surge in the incoming traffic.
 >
 > From the flink UI, if I check the back pressure status, it shows OK. I 
have
 > one task which has all the operators including source.
 >
 > Any idea why backpressure indicator would show OK, but messages are 
backed
 > up in Kafka.
 >
 > Is there any other mechanism/metrics by which I can identify this 
situation
 > ?
 >
 > I'm running Flink 1.2/w beam 2.0.
 >
 > Thanks,
 > Jins George
 
 






Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

2018-01-08 Thread Kyle Hamlin
+Aljoscha Krettek  I setup my project using the
template you suggested and I'm able to bucket and write files locally. I
also want to test writing to s3 but I don't know how to configure the `sbt
run` command to tell the FlinkMiniCluster to use the
flink-s3-fs-hadoop-1.4.0.jar and a flink-conf.yaml?

When I try running my jar via the `flink run` command I get the same:
"java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink" error. How do I overcome this
issues while being able to use the `flink run` command, so I'm able to the
flink-conf.yaml and flink-s3-fs-hadoop-1.4.0.jar?


On Fri, Jan 5, 2018 at 7:50 PM Kyle Hamlin  wrote:

> Also, I'm not using hdfs I'm trying to sink to s3.
>
> On Fri, Jan 5, 2018 at 6:18 PM Kyle Hamlin  wrote:
>
>> I have the hadoop-common.jar in my build.sbt because I was having issues
>> compiling my jar after moving from 1.3.2 to 1.4.0 because
>> org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use
>> them in my custom bucketer and to writer to write Avro out to Parquet.
>>
>> I tried adding classloader.resolve-order: parent-first to my
>> flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop"
>> and found the following:
>>
>> org/apache/hadoop/*
>> org/apache/parquet/hadoop/*
>>
>> after designating hadoop-common.jar dependency as "provided" only 
>> org/apache/parquet/hadoop/*
>> files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error
>> doesn't show up anymore just the following:
>>
>> java.lang.RuntimeException: Error while creating FileSystem when
>> initializing the state of the BucketingSink.
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>> hdfs://localhost:12345/
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>> at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>> ... 9 more
>>
>> Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear
>> to help.
>>
>>
>> On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek 
>> wrote:
>>
>>> I think this might be happening because partial Hadoop dependencies are
>>> in the user jar and the rest is only available from the Hadoop deps that
>>> come bundled with Flink. For example, I noticed that you have Hadoop-common
>>> as a dependency which probably ends up in your Jar.
>>>
>>>
>>> On 4. Jan 2018, at 11:40, Stephan Ewen  wrote:
>>>
>>> Hi!
>>>
>>> This looks indeed like a class-loading issue - it looks like "RpcEngine"
>>> and "ProtobufRpcEngine" are loaded via different classloaders.
>>>
>>> Can you try the following:
>>>
>>>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
>>>
>>> If that fixes the issue, then we can look at a way to make this
>>> seamless...
>>>
>>> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin  wrote:
>>>
 Hello,

 After moving to Flink 1.4.0 I'm getting the following error. I can't
 find anything online that addresses it. Is it a Hadoop dependency issue?
 Here are my project dependencies:

 libraryDependencies ++= Seq(
   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
   "org.apache.flink" % "flink-metrics-core" % 

Re: Back-pressure Status shows OK but records are backed up in kafka

2018-01-08 Thread Shannon Carey
Right, backpressure only measures backpressure on the inside of the Flink job. 
Ie. between Flink tasks.

Therefore, it’s up to you to monitor whether your Flink job is “keeping up” 
with the source stream. If you’re using Kafka, there’s a metric that the 
consumer library makes available. For example, for one of our jobs, in Graphite 
we have a metric that matches:

aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max,
 18, 19)

The “$Job” is a variable which allows you to select the job. You can see that I 
have wildcards on other elements of the path, for example the TaskManager id, 
the operator name, the Task index, etc. Your metric is probably rooted 
somewhere else, but the thing you’re looking for is under 
operator.*.*.KafkaConsumer.records-lag-max.

Flink manages its offsets itself, rather than acting like a “normal” consumer 
which commits offsets to Kafka. However, in the docs I see that 
“setCommitOffsetsOnCheckpoints()” is enabled by default.  So, theoretically you 
can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor 
or https://github.com/linkedin/Burrow etc. which polls Kafka itself and 
produces metrics about consumer lag. However, for some reason, I don’t see our 
Flink consumer metrics showing up in our lag monitoring tool or in the Kafka 
command-line tools, so I’m not sure what’s going on there. Maybe it’s because 
Flink doesn’t show up as a consumer group? At first I thought that it might be 
because we’re not setting the “group.id” property, but as it turns out we are 
indeed setting it. In any case, we have to use the job’s metrics, and monitor 
that the job is up, rather than monitoring the offset in Kafka itself. 

-Shannon

On 1/8/18, 1:52 AM, "Ufuk Celebi"  wrote:

Hey Jins,

our current back pressure tracking mechanism does not work with Kafka
sources. To gather back pressure indicators we sample the main task
thread of a subtask. For most tasks, this is the thread that emits
records downstream (e.g. if you have a map function) and everything
works as expected. In case of the Kafka source though there is a
separate thread that consumes from Kafka and emits the records.
Therefore we sample the "wrong" thread and don't observe any
indicators for back pressure. :-( Unfortunately, this was not taking
into account when back pressure sampling was implemented.

There is this old issue to track this:
https://issues.apache.org/jira/browse/FLINK-3456

I'm not aware of any other way to track this situation. Maybe others
can chime in here...

– Ufuk


On Mon, Jan 8, 2018 at 8:16 AM, Jins George  wrote:
> I have a Beam Pipeline consuming records from Kafka doing some
> transformations and writing it to Hbase. I faced an issue in which records
> were writing to Hbase at a slower rate than the incoming messages to Kafka
> due to a temporary surge in the incoming traffic.
>
> From the flink UI, if I check the back pressure status, it shows OK. I 
have
> one task which has all the operators including source.
>
> Any idea why backpressure indicator would show OK, but messages are backed
> up in Kafka.
>
> Is there any other mechanism/metrics by which I can identify this 
situation
> ?
>
> I'm running Flink 1.2/w beam 2.0.
>
> Thanks,
> Jins George





Re: Flink 1.3 -> 1.4 Kafka state migration issue

2018-01-08 Thread Gyula Fóra
Hi,
Thanks Gordon, should have read the announcement :)

This might indeed be the case here, I will just use the workaround. At
least this is a known issue, almost got a heart attack today :D

Cheers,
Gyula

Tzu-Li (Gordon) Tai  ezt írta (időpont: 2018. jan. 8.,
H, 17:56):

> Hi Gyula,
>
> Is your 1.3 savepoint from Flink 1.3.1 or 1.3.0?
> In those versions, we had a critical bug that caused duplicate partition
> assignments in corner cases, so the assignment logic was altered from 1.3.1
> to 1.3.2 (and therefore also 1.4.0).
>
> If you indeed was using 1.3.1 or 1.3.0, and you are certain that the
> savepoint does not contain duplicate partition assignments caused by the
> bug, then yes restoring with DOP 1 and then rescaling again is a good
> workaround.
>
> Please see the 1.3.2 release announcement [1] for details.
>
> Best,
> Gordon
>
> [1] http://flink.apache.org/news/2017/08/05/release-1.3.2.html
>
>
>
> On Jan 8, 2018 6:57 AM, "Gyula Fóra"  wrote:
>
> Migrating the jobs by setting the sources to parallelism = 1 and then
> scale back up after migration seems to be a good workaround, but I am
> wondering if something I do made this happen or this is a bug.
>
> Gyula Fóra  ezt írta (időpont: 2018. jan. 8., H,
> 14:46):
>
>> Hi,
>>
>> Is it possible that the Kafka partition assignment logic has changed
>> between Flink 1.3  and 1.4? I am trying to migrate some jobs using Kafka
>> 0.8 sources and about half my jobs lost offset state for some partitions
>> (but not all partitions). Jobs with parallelism 1 dont seem to be
>> affected...
>>
>> Any ideas?
>>
>> Gyula
>>
>
>


Re: Flink 1.3 -> 1.4 Kafka state migration issue

2018-01-08 Thread Tzu-Li (Gordon) Tai
Hi Gyula,

Is your 1.3 savepoint from Flink 1.3.1 or 1.3.0?
In those versions, we had a critical bug that caused duplicate partition
assignments in corner cases, so the assignment logic was altered from 1.3.1
to 1.3.2 (and therefore also 1.4.0).

If you indeed was using 1.3.1 or 1.3.0, and you are certain that the
savepoint does not contain duplicate partition assignments caused by the
bug, then yes restoring with DOP 1 and then rescaling again is a good
workaround.

Please see the 1.3.2 release announcement [1] for details.

Best,
Gordon

[1] http://flink.apache.org/news/2017/08/05/release-1.3.2.html


On Jan 8, 2018 6:57 AM, "Gyula Fóra"  wrote:

Migrating the jobs by setting the sources to parallelism = 1 and then scale
back up after migration seems to be a good workaround, but I am wondering
if something I do made this happen or this is a bug.

Gyula Fóra  ezt írta (időpont: 2018. jan. 8., H,
14:46):

> Hi,
>
> Is it possible that the Kafka partition assignment logic has changed
> between Flink 1.3  and 1.4? I am trying to migrate some jobs using Kafka
> 0.8 sources and about half my jobs lost offset state for some partitions
> (but not all partitions). Jobs with parallelism 1 dont seem to be
> affected...
>
> Any ideas?
>
> Gyula
>


Re: Flink 1.3 -> 1.4 Kafka state migration issue

2018-01-08 Thread Gyula Fóra
Migrating the jobs by setting the sources to parallelism = 1 and then scale
back up after migration seems to be a good workaround, but I am wondering
if something I do made this happen or this is a bug.

Gyula Fóra  ezt írta (időpont: 2018. jan. 8., H,
14:46):

> Hi,
>
> Is it possible that the Kafka partition assignment logic has changed
> between Flink 1.3  and 1.4? I am trying to migrate some jobs using Kafka
> 0.8 sources and about half my jobs lost offset state for some partitions
> (but not all partitions). Jobs with parallelism 1 dont seem to be
> affected...
>
> Any ideas?
>
> Gyula
>


Queryable State - Count within Time Window

2018-01-08 Thread Velu Mitwa
Hi,
I want to find the number of events happened so far in last 5 minutes and
make that as a Queryable state. Is it possible? It will be of great help if
someone provide some sample code for the same.

Thanks,
Velu.


Flink 1.3 -> 1.4 Kafka state migration issue

2018-01-08 Thread Gyula Fóra
Hi,

Is it possible that the Kafka partition assignment logic has changed
between Flink 1.3  and 1.4? I am trying to migrate some jobs using Kafka
0.8 sources and about half my jobs lost offset state for some partitions
(but not all partitions). Jobs with parallelism 1 dont seem to be
affected...

Any ideas?

Gyula


Gelly: akka.ask.timeout

2018-01-08 Thread Alieh Saeedi
Hey all,I have an iterative algorithm implemented in Gelly. As long as I 
upgraded everything to flink-1.3.1 from 1.1.2, the runtime has been increased 
and in some cases task managers are killed. The error msg is| 
"akka.ask.timeout". I increased akka.ask.timeout, but the problem still exist.
Cheers,Alieh

Re: does the flink sink only support bio?

2018-01-08 Thread Stefan Richter
Hi,

> 1.  If `snapshotState` failed at the first checkpoint, does it mean there is 
> no state and no transaction can be aborted by default?

This is a general problem and not only limited to the first checkpoint. 
Whenever you open a transaction, there is no guaranteed way to store it in 
persistent state to abort it in case of failure. In theory, your job can crash 
at any point after you just opened a transaction. So in the end I guess you 
must rely on something like e.g. timeout based mechanism. You can do some _best 
effort_ to proactively cancel uncommitted transactions through methods like 
states, listing them in files, or having a fixed pool of transaction ids and 
iterate them all for cancellation on a restart.

> 2. I saw FlinkKafkaProducer011 has a transaction id pool, which has multiple 
> ids to be reused by producer, and it aborts all ids in this pool in the 
> `initializeState`. Is this pool designed for the situation in the first 
> problem or something I haven't noticed?

This implementation is very specific for KafkaProducer and is not necessarily a 
good blueprint for what you are planning. In particular, in this case there is 
a fixed and limited universe of all potential transaction ids that a task can 
potentially (re)use, so after a restart without state we can simply iterate all 
possible transaction ids and issue a cancel for all of them. In general, you 
don’t always know all possible transaction ids in a way that allows you to 
opportunistically cancel all potential orphaned transactions. 

> 2018-01-04 22:15 GMT+08:00 Stefan Richter  >:
> Yes, that is how it works.
> 
> > Am 04.01.2018 um 14:47 schrieb Jinhua Luo  > >:
> >
> > The TwoPhaseCommitSinkFunction seems to record the transaction status
> > in the state just like what I imagine above, correct?
> > and if the progress fails before commit, in the later restart, the
> > commit would be triggered again, correct? So the commit would not be
> > forgotten, correct?
> >
> > 2018-01-03 22:54 GMT+08:00 Stefan Richter  > >:
> >> I think a mix of async UPDATES and exactly-once all this might be tricky,
> >> and the typical use case for async IO is more about reads. So let’s take a
> >> step back: what would you like to achieve with this? Do you want a
> >> read-modify-update (e.g. a map function that queries and updates a DB) or
> >> just updates (like a sink based that goes against a DB). From the previous
> >> question, I assume the second case applies, in which case I wonder why you
> >> even need to be async for a sink? I think a much better approach could be
> >> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
> >> batching to lower update costs.
> >>
> >> On top of the TwoPhaseCommitSinkFunction, you could implement transactions
> >> against your DB, similar to e.g. this example with Postgres:
> >> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
> >>  
> >> 
> >> .
> >>
> >> Does this help or do you really need async read-modify-update?
> >>
> >> Best,
> >> Stefan
> >>
> >>
> >> Am 03.01.2018 um 15:08 schrieb Jinhua Luo  >> >:
> >>
> >> No, I mean how to implement exactly-once db commit (given our async io
> >> target is mysql), not the state used by flink.
> >> As mentioned in previous mail, if I commit db in
> >> notifyCheckpointComplete, we have a risk to lost data (lost commit,
> >> and flink restart would not trigger notifyCheckpointComplete for the
> >> last checkpoint again).
> >> On the other hand, if I update and commit per record, the sql/stored
> >> procedure have to handle duplicate updates at failure restart.
> >>
> >> So, when or where to commit so that we could get exactly-once db ingress.
> >>
> >> 2018-01-03 21:57 GMT+08:00 Stefan Richter  >> >:
> >>
> >>
> >> Hi,
> >>
> >>
> >> Then how to implement exactly-once async io? That is, neither missing
> >> data or duplicating data.
> >>
> >>
> >> From the docs about async IO here
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
> >>  
> >> 
> >> :
> >>
> >> "Fault Tolerance Guarantees:
> >> The asynchronous I/O operator offers full exactly-once fault tolerance
> >> guarantees. It stores the records for in-flight asynchronous requests in
> >> checkpoints and restores/re-triggers the requests when recovering from a
> >> failure.“
> >>
> >> So it is already handled by Flink in a way that supports exactly-once.
> >>
> >> Is there some way to index data by checkpoint id 

Re: A question about Triggers

2018-01-08 Thread Fabian Hueske
I think I got it
Glad you solved this tricky issue and thanks for sharing your solution :-)

Best, Fabian


2018-01-06 14:33 GMT+01:00 Vishal Santoshi :

> Yep, this though is suboptimal as you imagined.   Two things
>
> *  has a internally has a  that is a ultra lite version of IN,
> only required for the path analysis.
> * Sessionization being expensive, we piggy back multiple other
> aggregations that do not depend on the path or order ( count etc ) .
> Essentially Session is (order path + accumulated stats).
>
> The code seems pretty all right and please tell me if you need a see it.
> All generics so no secrets here.
>
>
>
>
>
>
>
>
> On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> you would not need the ListStateDescriptor. A WindowProcessFunction
>> stores all events that are assigned to a window (IN objects in your case)
>> in an internal ListState.
>> The Iterable parameter of the process() method iterates over the
>> internal list state.
>>
>> So you would have a Trigger that fires when a new watermark is received
>> (or in regular intervals like every minute) and at the end of the window.
>> The process() method looks up the current watermark in the Context
>> object, traverses the Iterable filtering out all events with timestamp
>> > watermark (you would need to enrich the events with the timestamp which
>> can be done in a ProcessFunction), inserting the remaining ones into a
>> sorted data structure (possibly leveraging the almost sorted nature of the
>> events) and create a Session from it.
>>
>> This is probably less efficient than your ProcessFunction because
>> process() would go over the complete list over and over again and not be
>> able to persist the result of previous invocations.
>> However, the code should be easier to maintain.
>>
>> Does that make sense?
>>
>> Best, Fabian
>>
>> 2018-01-05 17:28 GMT+01:00 Vishal Santoshi :
>>
>>> Hello Fabian, Thank you for your response.
>>>
>>>  I thought about it and may be am missing something
>>> obvious here. The code below is what I think you suggest. The issue is that
>>> the window now is a list of Session's ( or shall subsets of the Session).
>>>
>>> What is required is that on a new watermark
>>>
>>> * We sort these Session objects
>>> * Get the subset that are before the new Watermark and an emit without
>>> purge.
>>>
>>> I do not see how the Trigger approach helps us. It does tell us that the
>>> watermark has progressed but to get a subset of the ListState that falls
>>> before the watermark, we would need access to *the new value  of the
>>> watermark*. That was what my initial query was.
>>>
>>>
>>>
>>> public class SessionProcessWindow>> SessionState> extends ProcessWindowFunction>> TimeWindow> {
>>>
>>>
>>> OUT toCreateNew;
>>> Long gap;
>>> private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;
>>>
>>> public SessionProcessWindow(TypeInformation aggregationResultType,
>>> OUT toCreateNew) {
>>> this.toCreateNew = toCreateNew;
>>> mergingSetsStateDescriptor =
>>> new ListStateDescriptor<>("sessions", 
>>> aggregationResultType);
>>> }
>>> @Override
>>> public void process(String s, Context context, Iterable elements, 
>>> Collector out) throws Exception {
>>> OUT session = toCreateNew.createNew();
>>> elements.forEach(f -> session.add(f));
>>> 
>>> context.windowState().getListState(mergingSetsStateDescriptor).add(session);
>>> }
>>> }
>>>
>>>
>>> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske  wrote:
>>>
 Hi Vishal,

 thanks for sharing your solution!

 Looking at this issue again and your mail in which you shared your
 SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
 ValueState that prevents the ProcessWindowFunction to be used in a
 mergeable window.
 You could have created a new Session object in each invocation of the
 ProcessWindowFucntion and simply keep the elements in the (mergable) list
 state of the window.
 In that case you would simply need a custom trigger that calls the
 ProcessWindowFunction when a new watermark arrives. For intermediate calls,
 you just FIRE and for the final call you FIRE_AND_PURGE to remove the
 elements from the window's state.
 Did you try that?

 Best, Fabian



 2018-01-03 15:57 GMT+01:00 Vishal Santoshi :

> Dear Fabian,
>
>I was able to create a pretty functional ProcessFunction
> and here is the synopsis and please see if it makes sense.
>
> Sessionization is unique as in it entails windows of dynamic length.
> The way flink approaches is pretty simple. It will create a TimeWindow of
> size "gap" relative to 

Re: Service discovery for flink-metrics-prometheus

2018-01-08 Thread Chesnay Schepler
Yes, the logs are the only way to find out which port the reporter is 
bound to.


We may be able to display this information in the web-UI, but it isn't 
very high on my list and will probably require

modifications to the reporter interface.

On 06.01.2018 04:24, Kien Truong wrote:

Hi,

We are using YARN for deployment, so the combination of host for 
the Prometheus reporters can be really random depending on how the 
containers are co-located.


One option we thought of was scrapping the log for this information, 
but it can be really messy in the long run.


Regards,
Kien

Sent from TypeApp 
On Jan 5, 2018, at 03:53, Stephan Ewen > wrote:


How are you running deploying your Flink processes? For Service
Discovery for Prometheus on Kubernetes, there are a few articles
out there...

On Thu, Jan 4, 2018 at 3:52 PM, Aljoscha Krettek
> wrote:

I'm not aware of how this is typically done but maybe Chesnay
(cc'ed) has an idea.

> On 14. Dec 2017, at 16:55, Kien Truong <
duckientru...@gmail.com > wrote:
>
> Hi,
>
> Does anyone have recommendations about integrating
flink-metrics-prometheus with some SD mechanism
>
> so that Prometheus can pick up the Task Manager's location
dynamically ?
>
> Best regards,
>
> Kien
>






Re: Python API not working

2018-01-08 Thread Chesnay Schepler
You can't run the python script directly, instead it must be submitted 
to a flink cluster using the pyflink.sh script, as
described in the documentation, which will in turn call the script with 
the appropriate parameters.


On 04.01.2018 11:08, Yassine MARZOUGUI wrote:

Hi all,

Any ideas on this?


2017-12-15 15:10 GMT+01:00 Yassine MARZOUGUI 
>:


Hi Ufuk,

Thanks for your response. Unfortunately specifying 'streaming` or
`batch` doesn't work, it looks like mode should be either "plan"
or "operator" , and then the program expects other inputs from the
stdin (id, port, etc.).

2017-12-15 14:23 GMT+01:00 Ufuk Celebi >:

Hey Yassine,

let me include Chesnay (cc'd) who worked on the Python API.

I'm not familiar with the API and what it expects, but try
entering
`streaming` or `batch` for the mode. Chesnay probably has the
details.

– Ufuk


On Fri, Dec 15, 2017 at 11:05 AM, Yassine MARZOUGUI
>
wrote:
> Hi All,
>
> I'm trying to use Flink with the python API, and started
with the wordcount
> exemple from the Documentation. I'm using Flink 1.4 and
python 2.7.
> When running env.execute(local=True), the command doesn't
execute and keeps
> waiting for input. If I hit enter again I get the following
error from
> Environment.py : ValueError("Invalid mode specified: " + mode)
> Looking at the source code, it looks like there are a bunch of
> sys.stdin.readline().rstrip('\n') where an input is expected
from the user.
> Any idea how to run the job? Thank you.
>
> Best,
> Yassine
>







Re: does the flink sink only support bio?

2018-01-08 Thread Tony Wei
Hi Stefan,

Since TwoPhaseCommitSinkFunction is new to me, I would like to know more
details.

There are two more questions:
1.  If `snapshotState` failed at the first checkpoint, does it mean there
is no state and no transaction can be aborted by default?
2. I saw FlinkKafkaProducer011 has a transaction id pool, which has
multiple ids to be reused by producer, and it aborts all ids in this pool
in the `initializeState`. Is this pool designed for the situation in the
first problem or something I haven't noticed?

Thank you.

Best Regards,
Tony Wei

2018-01-04 22:15 GMT+08:00 Stefan Richter :

> Yes, that is how it works.
>
> > Am 04.01.2018 um 14:47 schrieb Jinhua Luo :
> >
> > The TwoPhaseCommitSinkFunction seems to record the transaction status
> > in the state just like what I imagine above, correct?
> > and if the progress fails before commit, in the later restart, the
> > commit would be triggered again, correct? So the commit would not be
> > forgotten, correct?
> >
> > 2018-01-03 22:54 GMT+08:00 Stefan Richter :
> >> I think a mix of async UPDATES and exactly-once all this might be
> tricky,
> >> and the typical use case for async IO is more about reads. So let’s
> take a
> >> step back: what would you like to achieve with this? Do you want a
> >> read-modify-update (e.g. a map function that queries and updates a DB)
> or
> >> just updates (like a sink based that goes against a DB). From the
> previous
> >> question, I assume the second case applies, in which case I wonder why
> you
> >> even need to be async for a sink? I think a much better approach could
> be
> >> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
> >> batching to lower update costs.
> >>
> >> On top of the TwoPhaseCommitSinkFunction, you could implement
> transactions
> >> against your DB, similar to e.g. this example with Postgres:
> >> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-
> transaction-manager-that-works-with-postgresql/
> >> .
> >>
> >> Does this help or do you really need async read-modify-update?
> >>
> >> Best,
> >> Stefan
> >>
> >>
> >> Am 03.01.2018 um 15:08 schrieb Jinhua Luo :
> >>
> >> No, I mean how to implement exactly-once db commit (given our async io
> >> target is mysql), not the state used by flink.
> >> As mentioned in previous mail, if I commit db in
> >> notifyCheckpointComplete, we have a risk to lost data (lost commit,
> >> and flink restart would not trigger notifyCheckpointComplete for the
> >> last checkpoint again).
> >> On the other hand, if I update and commit per record, the sql/stored
> >> procedure have to handle duplicate updates at failure restart.
> >>
> >> So, when or where to commit so that we could get exactly-once db
> ingress.
> >>
> >> 2018-01-03 21:57 GMT+08:00 Stefan Richter  >:
> >>
> >>
> >> Hi,
> >>
> >>
> >> Then how to implement exactly-once async io? That is, neither missing
> >> data or duplicating data.
> >>
> >>
> >> From the docs about async IO here
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> asyncio.html
> >> :
> >>
> >> "Fault Tolerance Guarantees:
> >> The asynchronous I/O operator offers full exactly-once fault tolerance
> >> guarantees. It stores the records for in-flight asynchronous requests in
> >> checkpoints and restores/re-triggers the requests when recovering from a
> >> failure.“
> >>
> >> So it is already handled by Flink in a way that supports exactly-once.
> >>
> >> Is there some way to index data by checkpoint id and records which
> >> checkpoints already commit to db? But that means we need MapState,
> >> right?
> >>
> >>
> >> The information required depends a bit on the store that you are using,
> >> maybe the last confirmed checkpoint id is enough, but maybe you require
> >> something more. This transaction information is probably not „by-key“,
> but
> >> „per-operator“, so I would suggest to use operator state (see next
> answer).
> >> Btw the implementation of async operators does something very similar to
> >> restore pending requests, and you can see the code in
> „AsyncWaitOperator".
> >>
> >>
> >> However, the async-io operator normally follows other operators, e.g.
> >> fold, so it normally faces the DataStream but not KeyedStream, and
> >> DataStream only supports ListState, right?
> >>
> >>
> >> You can use non-keyed state, aka operator state, to store such
> information.
> >> See here:
> >> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/state.html#using-managed-operator-state
> >> . It does not require a KeyedSteam.
> >>
> >> Best,
> >> Stefan
> >>
> >>
> >>
> >> 2018-01-03 18:43 GMT+08:00 Stefan Richter  >:
> >>
> >>
> >>
> >> Am 01.01.2018 um 15:22 schrieb Jinhua Luo :
> >>
> >> 2017-12-08 18:25 GMT+08:00 Stefan Richter  >:
> >>
> >> You 

Re: Metric reporters with non-static ports

2018-01-08 Thread Chesnay Schepler
Yes the logs are the only way to find out which port the reporter is 
bound to.


We may be able to display this information in the web-UI, but it isn't 
very high on my list and will probably require

modifications to the reporter interface.

On 21.12.2017 09:41, Piotr Nowojski wrote:

I am not sure (and because of holiday season you might not get an answer 
quickly), however I do not see a way to obtain this port other then by looking 
into the log files. On the other hand, I have an impression, that intention of 
this feature was that if you must execute N reporters on one single machine, 
you configure port range to the size of N. That way you can just assume that 
each port was taken and used.

Maybe Chesnay will be able to answer this question better once he is back from 
the holidays.

Piotrek


On 20 Dec 2017, at 17:57, Jared Stehler  
wrote:

The prometheus metric reporter allows for a specification of a port range; is 
there a way I can find out which actual port it found to bind to?

Also, there doesn’t seem to be a way to reserve an extra port for task managers 
in mesos to assign to a metric reporter, is that a roadmap item? I’m able to 
override the port for the app master 
(-Dmetrics.reporter.prom_reporter.port=$PORT1) but this carries over to the 
task managers and can collide with the assigned data port, etc.



--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703