Re: Job is still in ACTIVE state after /jobs/:jobid/stop

2020-11-12 Thread Averell
I have some updates. Some weird behaviours were found. Please refer to the
attached photo.

All requests were sent via REST API

The status of the savepoint triggered by that stop request (ID 11018) is
"COMPLETED [Savepoint]", however, no checkpoint data has been persisted (in
S3).
The folder /`savepoint-5871af-c0f2d2334501/_metadata/`/ has been created in
S3, but no files in that.
This was the command I used to send the first stop request:
/curl -s -d '{"drain": false,
"targetDirectory":"*s3*://mybucket/savepoint"}' -H 'Content-Type:
application/json' -X POST
http://myip:45507/jobs/5871af88ff279f30ebcc49ce741c2d75/stop/

Suspected that /s3:/// might be the issue, I tried to send another stop
request (ID 11020), mistakenly having the path as /s3*s*:///. So it failed.

Another stop request was sent (ID 11021). This one failed after timeout (10
minutes). The GUI says the checkpoint failed with /`Checkpoint expired
before completing`/.
/curl -s -d '{"drain": false,
"targetDirectory":"s3*a*://mybucket/savepoint"}' -H 'Content-Type:
application/json' -X POST
http://myip:45507/jobs/5871af88ff279f30ebcc49ce741c2d75/stop/

I tried to send a create-savepoint request (ID 11023), and this time, it
completed successfully, with files persisted to S3. Checking Flink GUI I
could see that the job actually resumed before that savepoint request (with
the checkpoint ID 11021 created just 30 seconds after 11021 expired).
/curl -s -d '{"target-directory":"s3a://mybucket/savepoint", "cancel-job":
false}' -H 'Content-Type: application/json' -X POST
http://myip:45507/jobs/5871af88ff279f30ebcc49ce741c2d75/savepoints
/


 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Job is still in ACTIVE state after /jobs/:jobid/stop

2020-11-12 Thread Averell
Hi,

I'm on 1.11.0, with a streaming job running on a YARN session, reading from
Kinesis.
I tried to stop the job using REST, with "drain=false". After that POST
request, I got back a request_id (not sure how should I use that for).

Checked the job in GUI, I could see that a savepoint has been completed
successfully (my job has next to zero states, so that was very quick). The
watermark stopped increasing, and no more checkpoints after that. However,
the job's status is still ACTIVE.

Querying job details via REST showed that the job is not stoppable (I guess
this is misleading information), and the /timestamps.RUNNING/ is not
increasing.

/  "name": "MyFlinkJob",
  "isStoppable": false,
  "state": "RUNNING",
  "start-time": 1604016319260,
  "end-time": -1,
  "duration": 1166337471,
  "now": 1605182656731,
  "timestamps": {
"CANCELLING": 0,
"FAILING": 0,
"CANCELED": 0,
"FINISHED": 0,
"RUNNING": 1604016319495,
"FAILED": 0,
"RESTARTING": 0,
"CREATED": 1604016319260,
"RECONCILING": 0,
"SUSPENDED": 0
  },
/

Is this a known bug? Or is it an expected behaviour?
Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: HA on AWS EMR

2020-10-27 Thread Averell
Hello Robert,

Thanks for the info. That makes sense. I will save and cancel my jobs with
1.10, upgrade to 1.11, and restore the jobs from the savepoints.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: KryoException UnsupportedOperationException when writing Avro GenericRecords to Parquet

2020-10-26 Thread Averell
Hello Till,

Adding GenericRecordAvroTypeInfo(schema) does help.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


KryoException UnsupportedOperationException when writing Avro GenericRecords to Parquet

2020-10-21 Thread Averell
Hi,

I'm trying to convert a stream of JSON string to a stream of Avro
GenericRecords, and write this to parquet files, but I get the exception.
This exception came at the line /out.collect(genericRecord)/. If there's no
sink then there's no error.
/KryoException: java.lang.UnsupportedOperationException/

My code is as following:
/val parquetSink: StreamingFileSink[GenericRecord] =
StreamingFileSink
  .forBulkFormat(new Path(path),
  ParquetAvroWriters.forGenericRecord(new
Schema.Parser().parse(schemaString)))
  .build()


val parquetStream = inputStream.process(new ProcessFunction[String,
GenericRecord] {
@transient
private var schema: Schema = _
@transient
private var reader: GenericDatumReader[GenericRecord] = _

override def processElement(value: String,
ctx: ProcessFunction[String,
GenericRecord]#Context,
out: Collector[GenericRecord]): Unit
= {
if (reader == null) {
schema = new Schema.Parser().parse(schemaString)
reader = new GenericDatumReader[GenericRecord](schema)
}
try {
val genericRecord = reader.read(null,
DecoderFactory.get.jsonDecoder(schema, value))
out.collect(genericRecord)
} catch {
case e: Throwable =>
LOG.warn(s"Error decoding JSON string: $e\nRaw
string: `${value.value}`")
throw e
}
}
})
parquetStream.addSink(parquetSink)
/

The schema is a simple one with all fields are string.
I tried with both Flink 1.10.0 and 1.11.0, and currently stuck at this.
Could you please help?

Thanks and regards,
Averell



/com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:53)
at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:44)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProce

Re: HA on AWS EMR

2020-10-21 Thread Averell
Hello Roman,

Thanks for the answer.
I have already had that high-availability.storageDir configured to an S3
location. Our service is not critical enough, so to save the cost, we are
using the single-master EMR setup. I understand that we'll not get YARN HA
in that case, but what I expect here is the ability to quickly restore the
service in the case of failure. Without Zookeeper, when such failure
happens, I'll need to find the last checkpoint of each job and restore from
there. With the help of HA feature, I can just start a new
flink-yarn-session, then all jobs will be restored.

I tried to change zookeeper dataDir config to an EFS location which both the
old and new EMR clusters could access, and that worked.

However, now I have a new question: is it expectable to restore to a new
version of Flink (e.g. saved with Flink1.10 and restored to Flink1.11)? I
tried and got some error messages attached below. Not sure that's a bug or
expected behaviour.

Thanks and best regards,
Averell


/07:39:33.906 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed
07:40:11.585 [flink-akka.actor.default-dispatcher-2] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred
in the cluster entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Could not start
recovered job 6e5c12f1c352dd4e6200c40aebb65745.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:222)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_265]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.client.JobExecutionException: Could not instantiate
JobManager.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java

Re: HA on AWS EMR

2020-10-19 Thread Averell
Hello Roman,

Thanks for your time.
I'm using EMR 5.30.1 (Flink 1.10.0) with 1 master node.
/yarn.application-attempts/ is not set (does that means unlimited?), while 
/yarn.resourcemanager.am.max-attempts/ is 4.

In saying "EMR cluster crashed) I meant the cluster is lost. Some scenarios
which could lead to this are:
  - The master node is down
  - The cluster is accidentally / deliberately terminated.

I found a thread in our mailing list [1], in which Fabian mentioned a
/"pointer"/ stored in Zookeeper. It looks like this piece of information is
stored in Zookeeper's dataDir, which is by default stored in the local
storage of the EMR's master node. I'm trying to move this one to an EFS, in
hope that it would help. Not sure whether this is a right approach.

Thanks for your help.
Regards,
Averell


[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/HA-and-zookeeper-tp27093p27119.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


HA on AWS EMR

2020-10-19 Thread Averell
Hi,

I'm trying to enable HA for my Flink jobs running on AWS EMR.
Following [1], I created a common Flink YARN session and submitting all my
jobs to that one. These 4 config params were added
/high-availability = zookeeper
high-availability.storageDir =  
high-availability.zookepper.path.root = /flink
high-availability.zookeeper.quorum = :2181
/(The Zookeeper came with EMR was used)

The command to start that Flink YARN session is like this:
`/flink-yarn-session -Dtaskmanager.memory.process.size=4g -nm
FlinkCommonSession -z FlinkCommonSession -d/`

The first HA test - yarn application killed - went well. I killed that
common session by using `/yarn application --kill /` and created a
new session using the same command, then the jobs were restored
automatically after that session was up.

However, the 2nd HA test - EMR cluster crashed - didn't work: the */jobs are
not restored/ *after the common session was created on the new EMR cluster.
(attached  jobmanager.gz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/jobmanager.gz>
 
)

Should I expect that the jobs are restored in that scenario no.2 - EMR
cluster crashed.
Do I miss something here?

Thanks for your help.

Regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-09-12 Thread Averell
Hello Robert,

I'm not sure why the screenshot I attached in the previous post was not
shown. I'm trying to re-attach in this post. 
As shown in this screenshot, part-1-33, part-1-34, and part-1-35 have
already been closed, but the temp file for part-1-33 is still there.

Thanks and regards
Averell 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/FlinkFileSink.png>
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-08-31 Thread Averell
Hello Robert, Arvid,

As I am running on EMR, and currently AWS only supports version 1.10.
I tried both solutions that you suggested ((i) copying a SAXParser
implementation to the plugins folder and (ii) using the S3FS Plugin from
1.10.1), and both worked - I could have successful checkpoints.

However, intermittenly my checkpoints still fail (about 10%). And whenever
it fails, there are non-completed files left in S3 (attached screenshot
below).
I'm not sure whether those uncompleted files are expected, or is that a bug?

Thanks and regards,
Averell
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2020-08-28_at_11.png>
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: JSON to Parquet

2020-08-27 Thread Averell
Hi Dawid,

Thanks for the suggestion. So, basically I'll need to use the JSON connector
to get the JSON strings into Rows, and from Rows to Parquet records using
the parquet connecter?
I have never tried the TableAPI in the past, have been using the
StreamingAPI only. Will follow your suggestion now.

Thanks for your help.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-08-27 Thread Averell
Hello,

I have a Flink 1.10 job which runs in AWS EMR, checkpointing to S3a as well
as writing output to S3a using StreamingFileSink. The job runs well until I
add the Java Hadoop properties:  /-Dfs.s3a.acl.default=
BucketOwnerFullControl/. Since after that, the checkpoint process fails to
complete.

/Caused by: org.xml.sax.SAXException: SAX2 driver class
org.apache.xerces.parsers.SAXParser not found/
I tried to add a jar file with that class
(https://mvnrepository.com/artifact/xerces/xercesImpl/2.12.0) to my
flink/lib/ directory, then got the same error but different stacktrace:
/Caused by: org.apache.flink.util.SerializedThrowable: SAX2 driver class
org.apache.xerces.parsers.SAXParser not found/

This seems to be a dependencies conflict, but I couldn't track its root.
In my IDE I didn't have any dependencies issue, while I couldn't find
SAXParser in the dependencies tree.

*Here is the stacktrace when the jar file is not there:*
/Caused by: org.apache.hadoop.fs.s3a.AWSClientIOException: getFileStatus on
s3a://mybucket/checkpoint/a9502b1c81ced10dfcbb21ac43f03e61/chk-2/41f51c24-60fd-474b-9f89-3d65d87037c7:
com.amazonaws.SdkClientException: Couldn't initialize a SAX driver to create
an XMLReader: Couldn't initialize a SAX driver to create an XMLReader
at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2251)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:749)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:164)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
at
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
... 17 more
Caused by: com.amazonaws.SdkClientException: Couldn't initialize a SAX
driver to create an XMLReader
at
com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.(XmlResponsesSaxParser.java:118)
at
com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:87)
at
com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
at
com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
at
com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
at
com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1554)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1272)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4266)
at
com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:876)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$5(S3AFileSystem.java:1262)
at
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at

JSON to Parquet

2020-08-20 Thread Averell
Hello,

I have a stream with each message is a JSON string with a quite complex
schema (multiple fields, multiple nested layers), and I need to write that
into parquet files after some slight modifications/enrichment.

I wonder what options are available for me to do that. I'm thinking of JSON
-> AVRO (GenericRecord) -> Parquet. Is that an option? I would want to be
able to quickly/dynamically (as less code change as possible) change the
JSON schema.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Automatically resuming failed jobs in K8s

2020-06-12 Thread Averell
Thank you very much, Yang.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Automatically resuming failed jobs in K8s

2020-06-10 Thread Averell
Hi,
I'm running some jobs using native Kubernetes. Sometimes, for some unrelated
issue with our K8s cluster (e.g: K8s node crashed), my Flink pods are gone.
The JM pod, as it is deployed using a deployment, will be re-created
automatically. However, all of my jobs are lost.
What I have to do now are:
1. Re-upload the jars
2. Find the path to the last checkpoint of each job
3. Resubmit the job

Is there any existing option to automate those steps? E.g.
1. Can I use a jar file stored in the JM's file system or on S3 instead of
uploading the jar file via REST interface?
2. When restoring the job, I need to provide the full path of the last
checkpoint (/s3:chk-2345//). Is there any option
to just provide the base_path?
3. Store the info to restore the jobs in the K8s deployment config

Thanks a lot.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Metrics in kubernetes

2020-05-13 Thread Averell
Hi Gary,

Sorry for the false alarm. It's caused by a bug in my deployment - no
metrics were added into the registry.
Sorry for wasting your time.

Thanks and best regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Metrics in kubernetes

2020-05-12 Thread Averell
Hi Gary,

Thanks for the help.
Here below is the output from jstack. It seems not being blocked. 



In my JobManager log, there's this WARN, I am not sure whether it's relevant
at all.


Attached is the full jstack dump  k8xDump.txt
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/k8xDump.txt>
 
.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Metrics in kubernetes

2020-05-12 Thread Averell
Hi,

I'm trying to config Flink running in Kubernetes native to push some metrics
to NewRelic (using a custom ScheduledDropwizardReporter).

>From the logs, I could see that an instance of ScheduledDropwizardReporter
has already been created successfully (the overridden  getReporter() method
<https://github.com/apache/flink/blob/e346215edcf2252cc60c5cef507ea77ce2ac9aca/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java#L234>
  
was called).
An instance of  MetricRegistryImpl
<https://github.com/apache/flink/blob/e346215edcf2252cc60c5cef507ea77ce2ac9aca/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java#L141>
  
also created successfully (this log was shown: /Periodically reporting
metrics in intervals of 30 SECONDS for reporter my_newrelic_reporter/)

However, the  report() method
<https://github.com/apache/flink/blob/e346215edcf2252cc60c5cef507ea77ce2ac9aca/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java#L30>
  
was not called.

When running on my laptop, there's no issue at all.
Are there any special things that I need to care for when running in
Kubernetes?

Thanks a lot.

Regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: K8s native - checkpointing to S3 with RockDBStateBackend

2020-04-26 Thread Averell
Hi David, Yang,

Thanks. But I just tried to submit the same job on a YARN cluster using that
same uberjar, and it was successful. I don't have flink-s3-fs-hadoop.jar
anywhere in the lib or plugin folder.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: K8s native - checkpointing to S3 with RockDBStateBackend

2020-04-24 Thread Averell
Thank you Yun Tang.
Building my own docker image as suggested solved my problem.

However, I don't understand why I need that while I already had that
s3-hadoop jar included in my uber jar?

Thanks.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about Scala Case Class and List in Flink

2020-04-24 Thread Averell
Hi Timo,

This is my case class:
/case class Box[T](meta: Metadata, value: T) {
  def function1: A=>B = {...}
  def method2(...):A = {...}
}/

However, I still get that warning "/Class class data.package$Box cannot be
used as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on "Data
Types & Serialization" for details of the effect on performance./"

I imported /org.apache.flink.streaming.api.scala._/ << is this enough to
tell that I am using Scala API?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


K8s native - checkpointing to S3 with RockDBStateBackend

2020-04-23 Thread Averell
Hi,
I am trying to deploy my job to Kubernetes following the native-Kubernetes
guide. My job is checkpointing to S3 with RockDBStateBackend. It also has a
S3 StreamingFileSink.
In my jar file, I've already had /flink-hadoop-fs,
flink-connector-filesystem, flink-s3-fs-hadoop /(as my understanding, these
are for the S3 sink, please correct me if I'm wrong)

When I tried to submit the job, I got the following error (only a few
seconds after submitting): /Could not find a file system implementation for
scheme 's3'. The scheme is not directly supported by Flink and no Hadoop
file system to support this scheme could be loaded/

Not sure how I can get over this. 
Using s3a didn't help (s3 does work well when running on my dev machine)
I also tried to copy the file /flink-shaded-hadoop-2-uber-2.8.3-10.0.jar/ to
the //opt/flink/lib// folder of the JobManager pod, but it didn't help (is
it already too late? should that be there before the JM is started?)

Thanks for your help.
Averell


/
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create
checkpoint storage at checkpoint coordinator side.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:282)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:205)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
at
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:215)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:120)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:266)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3a'. The scheme is
not directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:64)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:490)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:477)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:279)
... 23 more/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Change to StreamingFileSink in Flink 1.10

2020-04-22 Thread Averell
Thanks @Seth Wiesman and all.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Change to StreamingFileSink in Flink 1.10

2020-04-21 Thread Averell
Hello Leonard, Sivaprasanna,

But my code was working fine with Flink v1.8.
I also tried with a simple String DataStream, and got the same error.
/StreamingFileSink
  .forRowFormat(new Path(path), new SimpleStringEncoder[String]())
  .withRollingPolicy(DefaultRollingPolicy.builder().build())
  .withBucketAssigner(new DateTimeBucketAssigner)
  .build()/
(screenshot below)

 

It's weird. At first I thought it's something wrong with IntelliJ, but I got
the same error when running mvn from commandline.






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi,

I tried to add the following cast, and it works. Doesn't look nice though

/StreamingFileSink
  .forRowFormat(new Path(path), myEncoder)
  .withRollingPolicy(DefaultRollingPolicy.create().build())
 
.withBucketAssigner(myBucketAssigner)*.asInstanceOf[RowFormatBuilder[IN,
String, _]]*
  .build()/

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi Sivaprasanna,

That is a compile-time error, not a runtime error.

/value build is not a member of ?0
possible cause: maybe a semicolon is missing before `value build'?/. 

There won't be any issue with either *withRollingPolicy*() or
/withBucketAssigner/(), but not both.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi,

I have the following code:
 /   StreamingFileSink
  .forRowFormat(new Path(path), myEncoder)
  .withRollingPolicy(DefaultRollingPolicy.create().build())
  .withBucketAssigner(myBucketAssigner)
  .build()/
This is working fine in Flink 1.8.3. However, when I try to compile with
Flink 1.10.0, I got the following error:
/ value build is not a member of ?0
possible cause: maybe a semicolon is missing before `value build'?/

As per the hint from IntelliJ,
/.forRowFormat returns a RowFormatBuilder[_ <: RowFormatBuilder[_]]
.withRollingPolicy(...) returns a RowFormatBuilder[_]
.withBucketAssigner(...) returns Any/

I'm using Maven 3.6.0, Java 1.8.0_242, and Scala 2.11.12. Tried with/without
IntelliJ, no difference.

Not sure/understand what's wrong

Thanks!
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Preserving (best effort) messages order between operators

2019-11-01 Thread Averell
Hi Yun,

I found the cause of the issue.
That ContinuousFileReaderOperator (my operator B) is using a PriorityQueue
which maintains a buffer sorted by modTime, thus my records were re-ordered.
I don't understand the reason behind using PriorityQueue instead of an
ordinary Queue though.

Thanks.
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Averell
Hi Kant,

Not sure about what you meant in "query it using SQL"? Do you mean running
ad-hoc SQL queries on that joined data? If that's what you meant, then
you'll need some SQL server first, then write the joined data to that SQL
server. ElasticSearch and Cassandra are ready-to-use options. Writing a
custom sink function to write to your own SQL server is also a
not-so-difficult solution.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Averell
Hi Kant,

I wonder why you need to "source" your intermediate state from files? Why
not "source" it from the previous operator? I.e. instead of (A join B) ->
State -> files -> (C), why not do (A join B) -> State -> (files + C)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Preserving (best effort) messages order between operators

2019-10-30 Thread Averell
Hi, 

I have a source function with parallelism = 1, sending out records ordered
by event-time. These records are then re-balanced to the next operator which
has parallelism > 1. I observed that within each subtask of the 2nd
operator, the order of the messages is not maintained. Is this behaviour
expected? If it is, is there any way to avoid that? Or at least reduce that?
I have high back-pressure on that 2nd operator as the one after that is
slow. There is also high back-pressure on the 1st operator, which makes my
problem more severe (the mentioned out-of-order is high). If I could
throttle the 1st operator when back-pressure is high, then I could mitigate
the mentioned problem. But I could not find any guide on doing that.

Could you please help?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to generate a sequential watermark which increases by one unit each time

2019-05-21 Thread Averell
Hi everyone,

I have a stream of files, each file has multiple records. A record has one
Long field named `ts`, which holds the file creation time (so it increases
over time, and one same value for all records in each file). However, I need
to have a watermark that increases by exactly one unit for each file.

I thought of extending the AssignerWithPeriodicWatermarks interface with a
member variable holding that sequence value. However, it seems to me that it
is not possible to persist that value during checkpoints.

Are there any options for me?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to export all not-null keyed ValueState

2019-05-10 Thread Averell Huyen Levan
Thank you very much, Fabian.

Regards,
Averell

On Fri, May 10, 2019 at 9:46 PM Fabian Hueske  wrote:

> Hi Averell,
>
> I'd go with your approach any state access (given that you use RocksDB
> keyed state) or deduplication of messages is going to be more expensive
> than a simple cast.
>
> Best, Fabian
>
> Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan <
> lvhu...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for that. However, as I mentioned in my previous email, that
>> implementation requires a lot of typecasting/object wrapping.
>> I tried to broadcast that Toggle stream - the toggles will be saved as a
>> MapState, and whenever an export trigger record arrived, I send out that
>> MapState. There's no use of applyToKeyedState in this implementation.
>> And the problem I got is I received duplicated output (one from each
>> parallelism-instance).
>>
>> Is there any option to modify the keyed state from within the
>> processBroadcastElement() method?
>>
>> Thanks a lot for your help.
>>
>> Regards,
>> Averell
>>
>>
>> On Fri, May 10, 2019 at 8:52 PM Fabian Hueske  wrote:
>>
>>> Hi Averell,
>>>
>>> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
>>> Since you had both streams keyed, your current solution looks fine to me.
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
>>> lvhu...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Sorry, but I am still confused about your guide. If I union the Toggle
>>>> stream with the StateReportTrigger stream, would that means I need to make
>>>> my Toggles broadcasted states? Or there's some way to modify the keyed
>>>> states from within the processBroadcastElement() method?
>>>>
>>>> I tried to implement the other direction (which I briefed in my
>>>> previous email). It seems working, but I am not confident in that, not sure
>>>> whether it has any flaws. Could you please give your comment?
>>>> In my view, this implementation causes a lot of type-casting for my
>>>> main data stream, which might cause a high impact on performance (my toggle
>>>> is on in only about 1% of the keys, and the rate of input1.left is less
>>>> than a millionth comparing to the rate of input1.right)
>>>>
>>>> /**
>>>>   * This KeyedBroadcastProcessFunction has:
>>>>   *input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>>>>   *   input1.left: Toggles in the form of a tuple (Key, Boolean).
>>>>   *  When Toggle._2 == true, records from input1.right for the 
>>>> same key will be forwarded to the main output.
>>>>   *  If it is false, records from input1.right for that same key 
>>>> will be dropped
>>>>   *   input1.right: the main data stream
>>>>   *
>>>>   *input2: a broadcasted stream of StateReport triggers. When a record 
>>>> arrived on this stream,
>>>>   *   the current value of Toggles will be sent out via the outputTag
>>>>   */
>>>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>>>>   extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], 
>>>> Any, MyEvent] {
>>>>
>>>>val toggleStateDescriptor = new 
>>>> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>>>>
>>>>override def processElement(in1: Either[Toggle, MyEvent],
>>>> readOnlyContext: 
>>>> KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, 
>>>> MyEvent]#ReadOnlyContext,
>>>> collector: Collector[MyEvent]): Unit = {
>>>>   in1 match {
>>>>  case Left(toggle) =>
>>>> 
>>>> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>>>>  case Right(event) =>
>>>> if (getRuntimeContext.getState(toggleStateDescriptor).value())
>>>>collector.collect(event)
>>>>   }
>>>>}
>>>>
>>>>override def processBroadcastElement(in2: Any,
>>>>context: KeyedBroadcastProcessFunction[Key, 
>>>> Either[Toggle, MyEvent], Any, MyEvent]#Context,
>>>>  

Re: How to export all not-null keyed ValueState

2019-05-10 Thread Averell Huyen Levan
Hi Fabian,

Thanks for that. However, as I mentioned in my previous email, that
implementation requires a lot of typecasting/object wrapping.
I tried to broadcast that Toggle stream - the toggles will be saved as a
MapState, and whenever an export trigger record arrived, I send out that
MapState. There's no use of applyToKeyedState in this implementation. And
the problem I got is I received duplicated output (one from each
parallelism-instance).

Is there any option to modify the keyed state from within the
processBroadcastElement() method?

Thanks a lot for your help.

Regards,
Averell


On Fri, May 10, 2019 at 8:52 PM Fabian Hueske  wrote:

> Hi Averell,
>
> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
> Since you had both streams keyed, your current solution looks fine to me.
>
> Best,
> Fabian
>
> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
> lvhu...@gmail.com>:
>
>> Hi Fabian,
>>
>> Sorry, but I am still confused about your guide. If I union the Toggle
>> stream with the StateReportTrigger stream, would that means I need to make
>> my Toggles broadcasted states? Or there's some way to modify the keyed
>> states from within the processBroadcastElement() method?
>>
>> I tried to implement the other direction (which I briefed in my previous
>> email). It seems working, but I am not confident in that, not sure whether
>> it has any flaws. Could you please give your comment?
>> In my view, this implementation causes a lot of type-casting for my main
>> data stream, which might cause a high impact on performance (my toggle is
>> on in only about 1% of the keys, and the rate of input1.left is less than a
>> millionth comparing to the rate of input1.right)
>>
>> /**
>>   * This KeyedBroadcastProcessFunction has:
>>   *input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>>   *   input1.left: Toggles in the form of a tuple (Key, Boolean).
>>   *  When Toggle._2 == true, records from input1.right for the same 
>> key will be forwarded to the main output.
>>   *  If it is false, records from input1.right for that same key 
>> will be dropped
>>   *   input1.right: the main data stream
>>   *
>>   *input2: a broadcasted stream of StateReport triggers. When a record 
>> arrived on this stream,
>>   *   the current value of Toggles will be sent out via the outputTag
>>   */
>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>>   extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], 
>> Any, MyEvent] {
>>
>>val toggleStateDescriptor = new 
>> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>>
>>override def processElement(in1: Either[Toggle, MyEvent],
>> readOnlyContext: KeyedBroadcastProcessFunction[Key, 
>> Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
>> collector: Collector[MyEvent]): Unit = {
>>   in1 match {
>>  case Left(toggle) =>
>> 
>> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>>  case Right(event) =>
>> if (getRuntimeContext.getState(toggleStateDescriptor).value())
>>collector.collect(event)
>>   }
>>}
>>
>>override def processBroadcastElement(in2: Any,
>>context: KeyedBroadcastProcessFunction[Key, 
>> Either[Toggle, MyEvent], Any, MyEvent]#Context,
>>collector: Collector[MyEvent]): Unit = {
>>   context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: 
>> ValueState[Boolean]) =>
>>  if (s != null) context.output(outputTag, (k, s.value(
>>}
>> }
>>
>> Thanks for your help.
>> Regards,
>> Averell
>>
>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> Passing a Context through a DataStream definitely does not work.
>>> You'd need to have the keyed state that you want to scan over in the
>>> KeyedBroadcastProcessFunction.
>>>
>>> For the toggle filter use case, you would need to have a unioned stream
>>> with Toggle and StateReport events.
>>> For the output, you can use side outputs to route the different outputs
>>> to different streams.
>>>
>>> Best, Fabian
>>>
>>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell :
>>>
>>>> Thank you Congxian and Fabian.
>>>>
>>>> @Fabian: could you please

Re: How to export all not-null keyed ValueState

2019-05-09 Thread Averell Huyen Levan
Hi Fabian,

Sorry, but I am still confused about your guide. If I union the Toggle
stream with the StateReportTrigger stream, would that means I need to make
my Toggles broadcasted states? Or there's some way to modify the keyed
states from within the processBroadcastElement() method?

I tried to implement the other direction (which I briefed in my previous
email). It seems working, but I am not confident in that, not sure whether
it has any flaws. Could you please give your comment?
In my view, this implementation causes a lot of type-casting for my main
data stream, which might cause a high impact on performance (my toggle is
on in only about 1% of the keys, and the rate of input1.left is less than a
millionth comparing to the rate of input1.right)

/**
  * This KeyedBroadcastProcessFunction has:
  *input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
  *   input1.left: Toggles in the form of a tuple (Key, Boolean).
  *  When Toggle._2 == true, records from input1.right for the
same key will be forwarded to the main output.
  *  If it is false, records from input1.right for that same
key will be dropped
  *   input1.right: the main data stream
  *
  *input2: a broadcasted stream of StateReport triggers. When a
record arrived on this stream,
  *   the current value of Toggles will be sent out via the outputTag
  */
class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
  extends KeyedBroadcastProcessFunction[Key, Either[Toggle,
MyEvent], Any, MyEvent] {

   val toggleStateDescriptor = new
ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])

   override def processElement(in1: Either[Toggle, MyEvent],
readOnlyContext:
KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any,
MyEvent]#ReadOnlyContext,
collector: Collector[MyEvent]): Unit = {
  in1 match {
 case Left(toggle) =>
getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
 case Right(event) =>
if (getRuntimeContext.getState(toggleStateDescriptor).value())
   collector.collect(event)
  }
   }

   override def processBroadcastElement(in2: Any,
   context:
KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any,
MyEvent]#Context,
   collector: Collector[MyEvent]): Unit = {
  context.applyToKeyedState(toggleStateDescriptor, (k: Key, s:
ValueState[Boolean]) =>
 if (s != null) context.output(outputTag, (k, s.value(
   }
}

Thanks for your help.
Regards,
Averell

On Thu, May 9, 2019 at 7:31 PM Fabian Hueske  wrote:

> Hi,
>
> Passing a Context through a DataStream definitely does not work.
> You'd need to have the keyed state that you want to scan over in the
> KeyedBroadcastProcessFunction.
>
> For the toggle filter use case, you would need to have a unioned stream
> with Toggle and StateReport events.
> For the output, you can use side outputs to route the different outputs to
> different streams.
>
> Best, Fabian
>
> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell :
>
>> Thank you Congxian and Fabian.
>>
>> @Fabian: could you please give a bit more details? My understanding is: to
>> pass the context itself and an OutputTag to the KeyedStateFunction
>> parameter
>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>> within that KeyedStateFunction.process() send out the side output. Am I
>> understand your idea correctly?
>>
>> BTW, I have another question regarding KeyedBroadcastProcessFunction best
>> practice: I am having two streams: Data and Toggle. The stream Toggle is
>> just a keyed boolean stream, being used to filter data from the stream
>> Data.
>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>
>> Now that I want to export the list of keys which are currently toggled on.
>> Should I
>> (1) have one additional KeyedBroadcastProcessFunction operator (which has
>> Toggle and BroadCast as the input streams), or
>> (2) replace that RichCoFlatMapFunction with a new
>> KeyedBroadcastProcessFunction, which has both functionalities: filter and
>> export? Doing this would require unioning Toggle and Data into one single
>> keyed stream.
>>
>> Thanks and best regards,
>> Averell
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: How to export all not-null keyed ValueState

2019-05-09 Thread Averell
Thank you Congxian and Fabian.

@Fabian: could you please give a bit more details? My understanding is: to
pass the context itself and an OutputTag to the KeyedStateFunction parameter
of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
within that KeyedStateFunction.process() send out the side output. Am I
understand your idea correctly?

BTW, I have another question regarding KeyedBroadcastProcessFunction best
practice: I am having two streams: Data and Toggle. The stream Toggle is
just a keyed boolean stream, being used to filter data from the stream Data.
And I am implementing that filter using a simple RichCoFlatMapFunction.

Now that I want to export the list of keys which are currently toggled on.
Should I
(1) have one additional KeyedBroadcastProcessFunction operator (which has
Toggle and BroadCast as the input streams), or 
(2) replace that RichCoFlatMapFunction with a new
KeyedBroadcastProcessFunction, which has both functionalities: filter and
export? Doing this would require unioning Toggle and Data into one single
keyed stream.

Thanks and best regards,
Averell 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to export all not-null keyed ValueState

2019-05-07 Thread Averell
Hi,

I have a keyed value state which is available for only about 1% the total
number of keyed values that I have. Is there any way to get the values of
all those state values? 
I looked at the queryable state option, but it looks like supporting
querying by keyed value only. 

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: I want to use MapState on an unkeyed stream

2019-05-06 Thread Averell
>From my understanding, having a fake keyBy (stream.keyBy(r => "dummyString"))
means there would be only one slot handling the data.
Would a broadcast function [1] work for your case?

Regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


IllegalArgumentException with CEP & reinterpretAsKeyedStream

2019-05-05 Thread Averell
Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP? 

Thanks and best regards,
Averell


My code:
/   val cepInput = streamA.keyBy(r => (r.id1, r.id2))
.connect(streamB.keyBy(r => (r.id1, r.id2)))
.flatMap(new MyCandidateFilterFunction())
.union(streamC.keyBy(r => (r.id1, r.id2)))

val cepOutput =
MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r 
=> (r.id1,
r.id2)),
counter1, counter2,
threshold1, threshold2)

object MyCEP {
def apply(input: KeyedStream[Event, _],
  longPeriod: Int,
  threshold: Int,
  shortPeriod: Int): DataStream[Event] = {

val patternLineIsUp = Pattern.begin[Event]("period1")
.where((value: event, ctx: 
CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
.times(longPeriod - 
shortPeriod).consecutive()
.next("period2")
.where((value: Event, ctx: 
CepContext[Event]) =>
accSum(_.counter, 
Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
.times(shortPeriod).consecutive()

collectPattern(input, patternLineIsUp)
}

private def accSum(f: Event => Long, keys: Seq[String], 
currentEvent:
Event, ctx: CepContext[Event]): Long = {
keys.map(key => 
ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
}

private def collectPattern(inputStream: KeyedStream[Event, _], 
pattern:
Pattern[Event, Event]): DataStream[Event] =
CEP.pattern(inputStream, pattern)
.process((map: util.Map[String, 
util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
val records = map.get("period2")

collector.collect(records.get(records.size() - 1))
})
}/

The exception:
/Exception in thread "main" 12:43:13,103 INFO 
org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.mycompany.StreamingJob$.main(Streaming.scala:440)
at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
at
org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
at
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.j

Re: CoProcessFunction vs Temporal Table to enrich multiple streams with a slow stream

2019-05-03 Thread Averell
Thank you Piotr for the thorough answer.

So you meant implementation in DataStreamAPI with cutting corners would,
generally, shorter than Table Join. I thought that using Tables would be
more intuitive and shorter, hence my initial question :)

Regarding all the limitations with Table API that you mentioned, is there
any summary page in Flink docs for that?

Thanks and regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


CoProcessFunction vs Temporal Table to enrich multiple streams with a slow stream

2019-05-03 Thread Averell
Hi,

Back to my story about enriching two different streams with data from one
(slow stream) using Flink's low lever functions like CoProcessFunction
(mentioned in this thread:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoFlatMapFunction-with-more-than-two-input-streams-td22320.html)

Now I see that Flink Table also support doing something similar with
Temporal Table [1]. With this, I would only need to convert my enrichment
stream to be a Temporal table, and the two other streams into two unbounded
tables.

*/In term of performance and resource usage/*, would this way of
implementation (using Flink Table) be better than the option no.1 mentioned
in my other thread: creating two different (though similar)
CoProcessFunction's, maintaining two state tables (for the enrichment
stream, one in each function)?

Thanks and best regards,
Averell 

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/temporal_tables.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Timestamp and key preservation over operators

2019-05-03 Thread Averell
Thank you Fabian.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Timestamp and key preservation over operators

2019-05-03 Thread Averell
Thank you Fabian.

One more question from me on this topic: as I send out early messages in my
window function, the timestamp assigned by window function (to the end-time
of the window) is not like my expectation. I want it to be the time of the
(last) message that triggered the output.

Is there any way to accomplish that?
Currently, I have an assignTimestampsAndWatermarks after my window function,
but, as you said, it is against the best practice. 

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Timestamp and key preservation over operators

2019-05-02 Thread Averell
Thank you Fabian.

I have one more question about timestamp:
In the previous email, you asked how did I check the timestamp - I don't
have an answer. Then I only checked the watermark, not the timestamp. I had
the (wrong) assumption that watermarks advance along with timestamps.
Today I played with that early trigger window, putting the output into a
table, and found that the timestamp is set to the window's end-time, but the
watermark seems not. (My window is [10:00-10:15), my incoming msgs both have
time-stamp of 10:00, which trigger one early output with timestamp
10:14:59.999, but the watermark stays at 10:00) 

Thus, my question: what is the easiest way to check the timestamp of a
message?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Timestamp and key preservation over operators

2019-05-01 Thread Averell
Hi Fabian, Guowei,

I have some updates:
1. I added timestamp extractor on all of my remaining sources (3 &
4), and the watermark does propagate to my final operator.
2. As I could not find a way to set my file sources as IDLE, I tried to
tweak the class ContinuousFileReaderOperator to be always IDLE:
/   nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
readerContext.collect(nextElement);
if 
(this.format.getFilePaths()[0].getPath().contains(""))
readerContext.markAsTemporarilyIdle();
} else {
/ and the result I got was there's no watermark at all for that stream, and
that IDLE status seemed not to be taken into account (my CEP operator didn't
generate any output). So I do not understand what that IDLE StreamStatus is
for.
My temporary solution, for now, is to use MAX_WATERMARK for those idle
sources. Not sure whether doing that is recommended?

Thanks for your help.
Regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Timestamp and key preservation over operators

2019-04-30 Thread Averell
Hi Fabian, Guowei

Thanks for the help. My flow is as the attached photo. Where (1) and (2) are
the main data streams from file sources, while (3) and (4) are the
enrichment data, also from file sources.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-05-01_at_08.png>
 

(5) is to merge-parse (1) and (2), which consists of: 
A tumbling window function, with early trigger (basing on the number of
records in the window: FIRE when there have been at least one msg from each
stream 1 & 2, not waiting for window end-time)
A flat map function to parse the incoming msg
A filter and a map

(6) works as a data enricher, to enrich output of (5) with data from (3) and
(4). As (4) is broadcasted, what My implementation for (6) is like:
/stream5.union(stream3).keyBy(key2).connect(stream4).process(MyFunction6
extends KeyedBroadcastProcessFunction)/
In this KeyedBroadcastProcessFunction, one msg from (5) would trigger one
output, while a msg from (3) or (4) doesn't send out any records, but update
the States only.

Regarding messages type:
Outputs of (1) and (2) are of the same type EventType1.
Output of (3) is of type EventType2_1 extends EventType2
Output of (5) is of type EventType2_2 extends EventType2
Input of (6) is of type EventType2 (from the unioned-keyed-stream), and 
of
type Type3 (from the broadcast stream)
Output of (6) is of the type EventType2_3, which is mapped from 
EvenType2_1

As seen on my screenshot, only (5) showed watermark, not (6) nor (7). I
noticed that problem because my (7) didn't work as expected. And when I put
an eventTimeExtractor between (6) and (7), then (7) worked.

Typing all the way until now, I guess that I have known where my issue came
from: I have not assign timestamp/watermark for (3) and (4) because I
thought that they are just idle sources of enrichment data.

/*Because of this, I have another question:*/
I read the text regarding Idling sources [1], but not sure how to implement
that for my file sources. Could you please recommend a
solution/good-practice here?

I have one more question about the recommendation [2] to emit timestamp and
watermark from within the source function. Is there any way to do that with
the file sources?

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#idling-sources
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html#source-functions-with-timestamps-and-watermarks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Timestamp and key preservation over operators

2019-04-29 Thread Averell
Hello,

I extracted timestamps using BoundedOutOfOrdernessTimestampExtractor from my
sources, have a WindowFunction, and found that my timestamps has been lost.
To do another Window operation, I need to extract timestamp again. I tried
to find a document for that but haven't found one.
Could you please help tell which type of operators would preserve records'
timestamp? 

The same question for keyed stream. I have been using the same key
throughout my flow, but with many tranformations (using different operators,
including coProcessFunction, and converting my data between different
classes), and I have been trying to use
DataStreamUtils.reinterpretAsKeyedStream. Is it safe to assume that as long
as I dont do transformation on key, I could use that
reinterpretAsKeyedStream function?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Sending FileInputSplit to the next operator

2019-04-28 Thread Averell
Hi,

*Problem descriptions*
I have two file-sources having a same format, each has at most one new file
every single tumbling window, and I need to merge data from those two
sources. My operators chain is as follow:
   FileReader1 --> Parser --\
 Union -> WindowFunction
(tumbling, merge)
   FileReader2 --> Parser --/ 
The parser is implemented within the "nextRecord()" method of my custom
FileInputFormat (MyInputFormatOriginal)

Worrying of the speed the two streams are not sync-ed, data from one stream
after parsed would queue up in memory waiting for the other, I tried to
change to:
   FileReader1 --\
 Union -> WindowFunction (tumbling, parse,
merge)
   FileReader2 --/ 
The two streams that are being unioned are just DataStream[FileInputSplit]
only. My new, simplified custom FileInputFormat (MyInputFormatLite) is just
like:

/   class MyInputFormatLite extends FileInputFormat[FileInputSplit] {
private var file: FileInputSplit = _
private var end: Boolean = false
override def open(split: FileInputSplit): Unit = {
this.end = false
this.data = split
// I don't call super.open() here
}
override def nextRecord(ot: FileInputSplit): FileInputSplit = {
this.end = true
this.data
}
override def reachedEnd: Boolean = this.end
}/

In the WindowFunction, I will actually read the files using my existing
parser class (MyInputFormatOriginal)
/   class myWindowFunction extends 
ProcessWindowFunction[FileInputSplit,...] {
private lazy val reader = new MyInputFormatOriginal()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
reader.setRuntimeContext(this.getRuntimeContext)
}
override def process(key: String, context: Context, elements:
Iterable[FileInputSplit], ...): Unit = {
elements.foreach (split => {
reader.open(split)
out.collect(reader.nextRecord(SomeEmptySplit))
reader.close()
})
}
}
/

*ISSUES*
My 2nd implementation is processing the files well, but there are two big
issues:
1. The performance is only half of the initial implementation (I
count the number of files processed when checkpointing is not running)
2. The checkpointing process stuck at the Window function. In the
1st implementation, with double the amount of data processed, each
checkpoint takes about 1-2 minutes. While in the 2nd one, I have tried to
wait up to 30 minutes without seeing any subtask completed the checkpoint. 

Could you please help tell me the wrong in that 2nd implementation?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FileInputFormat that processes files in chronological order

2019-04-28 Thread Averell
Hi,

Regarding splitting by shards, I believe that you can simply create two
sources, one for each shard. After that, union them together.

Regarding processing files in chronological order, Flink currently reads
files using the files' last-modified-time order (i.e. oldest files will be
processed first). So if your file1.json is older than file2, file2 is older
than file3, then you don't need to do anything.
If your file-times are not in that order, then I think its more complex. But
I am curious about why there are such requirements first. Is this a
streaming problem?

I don't think FileInputFormat has anything to do here. Use that when your
files are in a format not currently supported by Flink.

Regards,
Averell  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Serialising null value in case-class

2019-04-28 Thread Averell
Thank you Timo.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Write batch function for apache flink

2019-04-28 Thread Averell
Hi Anurag,

Something like this one:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/batch/
Is it what you are looking for?

Regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Serialising null value in case-class

2019-04-26 Thread Averell
Thank you Timo.

In term of performance, does the use of Option[] cause performance impact? I
guess that there is because there will be one more layer of object handling,
isn't it?

I am also confused about choosing between primitive types (Int, Long) vs
object type (Integer, JLong). I have seen many places in Flink documents
that Java primitive types are recommended. But how are Scala types?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Identify orphan records after joining two streams

2019-04-26 Thread Averell
Hi Dawid,

I just tried to change from CoProcessFunction with onTimer() to
ProcessWindowFunction with Trigger and TumblingWindow. So I can key my
stream by (id) instead of (id, eventTime). With this, I can use
/reinterpretAsKeyedStream/, and hope that it would give better performance.
I can also use the out of the box function sideOutputLateData() 
Not sure whether I would really be benefited from that.

Thanks and regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Serialising null value in case-class

2019-04-26 Thread Averell
Good day,

I have a case-class defined like this:

case class MyClass(ts: Long, s1: String, s2: String, i1: Integer,  i2:
Integer)
object MyClass {
val EMPTY = MyClass(0L, null, null, 0, 0)
def apply(): MyClass = EMPTY
}

My code has been running fine (I was not aware of the limitation mentioned
in
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html)

But when I tried to create the instance /MyClass(0L, null, null, *null*,
0)/, I got the following error: /org.apache.flink.types.NullFieldException:
Field 3 is null, but expected to hold a value./

I am confused. Why there's the difference between a null String and a null
Integer?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Identify orphan records after joining two streams

2019-04-18 Thread Averell
Thank you Hecheng.

I just tried to use Table API as your suggestion, and it almost worked (it
worked with two issues here below):
- I only get the output when my event-time watermark goes pass the end
of the tumbling window. But, because I know that there are maximum 2 records
per window (one from each stream), I would like to collect my output record
as soon as I received two input records. With low-level-API, I believe I can
do this with Trigger. Can I achieve a similar result with Table API?
- In the UDAggF document, I saw a recommendation to use Java instead of
Scala. Does this apply to the low-level-API functions as well? 

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Identify orphan records after joining two streams

2019-04-15 Thread Averell
Hello,

I have two data streams, and want to join them using a tumbling window. Each
of the streams would have at most one record per window. There is also a
requirement to log/save the records that don't have a companion from the
other stream.
What would be the best option for my case? Would that be possible to use
Flink's Join?

I tried to use CoProcessFunction: truncating the timestamp of each record to
the beginning of the tumbling window, and then "keyBy" the two streams using
(key, truncated-timestamp). When I receive a record from one stream, if
that's the first record of the pair, then I save it to a MapState. If it is
the 2nd record, then I merge with the 1st one then fire.
This implementation works, but
(a) I feel that it's over-complicated, and
(b) I have a concern that when one stream is slower than the other, my
cached data would build up and make my cluster out-of-memory. Would
back-pressure kicks in for this case?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: No resource available error while testing HA

2019-03-15 Thread Averell
Hi Gary,

Thanks for the answer. I missed your most recent answer in this thread too.
However, my last question 

Averell wrote
> How about changing the configuration of the Flink job itself during
> runtime?
> What I have to do now is to take a savepoint, stop the job, change the
> configuration, and then restore the job from the save point.

was about changing job configuration (like parallelism, checkpoint
locations, checkpoint period,...), not about logback.

Thanks and regards,
Averel



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Where does the logs in Flink GUI's Exception tab come from?

2019-03-15 Thread Averell
Hi Gary,

Thanks a lot for the explanation, and sorry for missing your earlier
message.
I am clear now.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Where does the logs in Flink GUI's Exception tab come from?

2019-03-14 Thread Averell
Hi everyone,

I am running Flink in EMR YARN cluster, and when the job failed and
restarted, I could see some logs in the Exception tab of Flink GUI.

 

I could not find this piece of logs on my cluster's hard-disk - not in TM or
JM logs.

Where can I find this?

Thanks.

Here below is my logback.xml. I'm not sure it has anything to do with my
question.



${log.file}
false

%d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level
%logger{60} %X{sourceThread} - %msg%n
















 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 parquet sink - failed with S3 connection exception

2019-03-14 Thread Averell
Hi Kostas and everyone,

I tried to change setFailOnCheckpointingErrors from True to False, and got
the following trace in Flink GUI when the checkpoint/uploading failed. Not
sure whether it would be of any help in identifying the issue.

BTW, could you please help tell where to find the log file that Flink GUI's
Exception tab is reading from?

Thanks and regards,
Averell

java.lang.ArrayIndexOutOfBoundsException: 122626
at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainLongDictionaryValuesWriter.fallBackDictionaryEncodedData(DictionaryValuesWriter.java:397)
at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.fallBackAllValuesTo(DictionaryValuesWriter.java:130)
at
org.apache.parquet.column.values.fallback.FallbackValuesWriter.fallBack(FallbackValuesWriter.java:153)
at
org.apache.parquet.column.values.fallback.FallbackValuesWriter.checkFallback(FallbackValuesWriter.java:147)
at
org.apache.parquet.column.values.fallback.FallbackValuesWriter.writeLong(FallbackValuesWriter.java:181)
at
org.apache.parquet.column.impl.ColumnWriterV1.write(ColumnWriterV1.java:228)
at
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addLong(MessageColumnIO.java:449)
at
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:327)
at
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
at
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
at
org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
at
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 parquet sink - failed with S3 connection exception

2019-03-10 Thread Averell
Hi Kostas, and everyone,

Just some update to my issue: I have tried to:
 * changed s3 related configuration in hadoop as suggested by hadoop
document [1]: 
 increased /fs.s3a.threads.max/ from 10 to 100, and
/fs.s3a.connection.maximum/ from 15 to 120. For reference, I am having only
3 S3 sinks, with parallelisms of 4, 4, and 1.
 * followed AWS's document [2] to increase their EMRFS maxConnections to
200. However, I doubt that this would make any difference, as in creating
the S3 parquet bucket sink, I needed to use "s3a://..." path. "s3://..."
seems not supported by Flink yet. 
 * reduced the parallelism for my S3 continuous files reader.

However, the problem still randomly occurred (random by job executions. When
it occurred, the only solution is to cancel the job and restart from the
last successful checkpoint).

Thanks and regards,
Averell

[1]  Hadoop-AWS module: Integration with Amazon Web Services
<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#aTimeout_waiting_for_connection_from_pool_when_writing_to_S3A>
  
[2]  emr-timeout-connection-wait
<https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/>
  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 parquet sink - failed with S3 connection exception

2019-03-05 Thread Averell
Hello Kostas,

Thanks for your time.

I started that job from fresh, set checkpoint interval to 15 minutes. It
completed the first 13 checkpoints successfully, only started failing from
the 14th. I waited for about 20 more checkpoints, but all failed.
Then I cancelled the job, restored from the last successful checkpoint, and
there were no more issues.

Today, I had another try - restoring from the last successful checkpoint
from yesterday. Result: started getting the same error from the first
checkpoint after restore. 
Tried to cancel and restore again, then no more issue until now (35 more
checkpoints already).

Regarding my job: I have 6 different S3-file-source streams
connected/unioned together, and then connected to a 7th S3-file-source
broadcast stream. Sinks are S3 parquet files and Elasticsearch.
Checkpointing is incremental and uses RocksDB.
This broadcast stream is one of the new changes to my job. The previous
version with 4 out of those 6 sources has been running well for more than a
month without any issue.
TM/JM logs for the first run today (the failure one) are attached.
The Yarn/EMR cluster is dedicated to the job.

I have a feeling that the issue comes from that broadcast stream (as
mentioned in the document, it doesn't use RocksDB). But not quite sure.

Thanks and regards,
Averell

logs.gz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/logs.gz>
  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


S3 parquet sink - failed with S3 connection exception

2019-03-04 Thread Averell
Hello everyone,

I have a job which is writing some streams into parquet files in S3. I use
Flink 1.7.2 on EMR 5.21.
My job had been running well, but suddenly it failed to make a checkpoint
with the full stack trace mentioned below. After that failure, the job
restarted from the last successful checkpoint, but it could not make any
further checkpoint - all subsequent checkpoints failed with the same reason. 
Searching on Internet I could only find one explanation: S3Object has not
been closed properly.

Could someone please help?

Thanks and regards,
Averell


 /The program finished with the following exception:

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
8cf68432acb3b4ced2cbc97bc23b4af5 failed.
at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
failed: Could not perform checkpoint 33 for operator Sink: S3 - Instant
(1/4).
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: java.lang.Exception:
Checkpoint failed: Could not perform checkpoint 33 for operator Sink: S3 -
Instant (1/4).
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortWithCause(PendingCheckpoint.java:452)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:447)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1258)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failUnacknowledgedPendingCheckpointsFor(CheckpointCoordinator.java:918)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyExecutionChange(ExecutionGraph.java:1779

Re: Broadcast state before events stream consumption

2019-02-21 Thread Averell
Hi Konstantin,

The statement below is mentioned at the end of the page 
broadcast_state.html#important-considerations
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#important-considerations>
  
/"No RocksDB state backend: Broadcast state is kept in-memory at runtime and
memory provisioning should be done accordingly. This holds for all operator
states."/

I am using RocksDB state backend, and is confused by that statement and
yours.

Could you please help clarify?

Thanks and regards,
Averell
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Stream enrichment with static data, side inputs for DataStream

2019-02-21 Thread Averell
Hi Artur,

Is that possible to make that "static" stream a keyedStream basing on that
foreign key?
If yes, then just connect the two streams, keyed on that foreign key. In the
CoProcessFunction, for every single record from the static stream, you write
its content into a ValueState; and for every record from the main stream,
you read the enrichment data from the saved ValueState to enrich that
mainstream record.

If no, then I am having the same issue :D Looking at Broadcast State, but
there is still something that doesn't look right for me.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-14 Thread Averell
Thank you Gordon and Ken.

My Flink job is now running well with 1.7.2 RC1, with failed ES request
retried successfully.

One more question I have on this is how to limit the number of retries for
different types of errors with ES bulk request. Is there any guideline on
that?

My temporary solution is to use the version field of each ER request -
increase it for every time I retried putting the request into the queue.
This works for me until now, but it doesn't look right.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-13 Thread Averell
Hi Ken,

Thanks for that. But I could not find the changes included in Gordon's
mentioned pull request in the repository you gave me (e.g: the new class
/ElasticsearchFailureHandlerIndexer/). 
I have found this folder
https://dist.apache.org/repos/dist/dev/flink/flink-1.7.2-rc1/, but it also
doesn't have that new class.
Maybe Gordon meant 1.7.2 rc2?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-13 Thread Averell
Hi Gordon,

Sorry for a noob question: How can I get the RC 1.7.2 build / code to build?
I could not find any branch like that in Github.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: No resource available error while testing HA

2019-02-13 Thread Averell
Hi Gary,

Thanks for the suggestion. 

How about changing the configuration of the Flink job itself during runtime?
What I have to do now is to take a savepoint, stop the job, change the
configuration, and then restore the job from the save point. 

Is there any easier way to do that?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-13 Thread Averell
Thank you Gordon.

That's my exact  problem. Will try the fix in 1.7.2 now.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-08 Thread Averell
Hello,

I am trying to follow this Flink guide [1] to handle errors in
ElasticSearchSink by re-adding the failed messages to the queue.
The error scenarios that I got and going to retry are: (i) conflict in
UpdateRequest document version and (ii) lost connection to ElasticSearch.
These errors are expected to be non-persistent, would be solved by (i)
changing the version / (ii) gone after some seconds
What I expect is message got retried successfully.
What I actually got was: Flink seemed to get stuck on that (first) retry, my
flow queued up (backpressure is 1 everywhere), all processing hung.

Here is my error handling code:


private object MyElasticSearchFailureHandler extends
ActionRequestFailureHandler {
override def onFailure(actionRequest: ActionRequest, failure: 
Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
if (ExceptionUtils.findThrowableWithMessage(failure,
"version_conflict_engine_exception") != Optional.empty()) {
actionRequest match {
case s: UpdateRequest =>
LOG.warn(s"Failed inserting 
record to ElasticSearch due to version
conflict (${s.version()}). Retrying")
LOG.warn(actionRequest.toString)

indexer.add(s.version(s.version() + 1))
case _ =>
LOG.error("Failed inserting 
record to ElasticSearch due to version
conflict. However, this is not an Update-Request. Don't know why.")

LOG.error(actionRequest.toString)
throw failure
}
} else if (restStatusCode == -1 &&
failure.getMessage.contains("Connection closed")) {
LOG.warn(s"Retrying record: 
${actionRequest.toString}")
actionRequest match {
case s: UpdateRequest => indexer.add(s)
case s: IndexRequest => indexer.add(s)
}
} else {
LOG.error(s"ELASTICSEARCH FAILED:\n
statusCode $restStatusCode\n   
message: ${failure.getMessage}\n${failure.getStackTrace}")
LOG.error(s"DATA:\n
${actionRequest.toString}")
throw failure
}
}
}


Here is the extract from my task-manager logs:

/2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR
o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
Elasticsearch bulk request: Connection closed
2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN 
c.n.c..sink.MyElasticSearchSink$  - Retrying record: update
{[idx-20190208][_doc][doc_id_154962270], doc_as_upsert[true], doc[index
{*[null][null][null]*, source[{...}]}], scripted_upsert[false],
detect_noop[true]}
2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=24 (max part counter=26)./

And job-manager logs:
/2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in
307078 ms).
2019-02-09 04:09:30.970 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:17:00.970 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 24
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:24:31.035 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:32:01.035 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 25
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:39:30.961 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests>
  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to use ExceptionUtils.findThrowable() in Scala

2019-02-07 Thread Averell
P/S: This is the full stack trace

2019-02-07 01:53:12.790 [I/O dispatcher 16] ERROR
o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
Elasticsearch item request: [...][[...][1]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]]
org.elasticsearch.ElasticsearchException: Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]
at
org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
at
org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
at
org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
at
org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
at
org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
at
org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
at
org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
at
org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
at 
org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
at
org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: No resource available error while testing HA

2019-02-07 Thread Averell
Hi Gary,

I am trying to reproduce that problem.
BTW, is that possible to change log level (I'm using logback) for a running
job?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to use ExceptionUtils.findThrowable() in Scala

2019-02-07 Thread Averell
Hello,

I am trying to implement error handling in ElasticSearch sink (following the
seem-outdated Flink document [1])


override def onFailure(actionRequest: ActionRequest, failure: Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
if (ExceptionUtils.findThrowable(failure,
classOf[org.elasticsearch.index.engine.VersionConflictEngineException]) !=
Optional.empty()) {
LOG.warn("Failed inserting record to ElasticSearch: 
statusCode {}
message: {} record: {} stacktrace {}.\nRetrying", restStatusCode.toString,
failure.getMessage, actionRequest.toString, failure.getStackTrace)
// Do something here
}
else {
LOG.error(s"ELASTICSEARCH FAILED:\nstatusCode 
$restStatusCode\n   
message: ${failure.getMessage}\n${failure.getStackTrace}")
}
}


I tried to have different handling for the case of
VersionConflictEngineException, but failed. It always came to the "else"
branch, thus my log message is:
/ELASTICSEARCH FAILED:
statusCode 409
message: Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]
/
Thanks and best regards,
Averell

[1]  handling-failing-elasticsearch-requests
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests>
  




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: No resource available error while testing HA

2019-01-31 Thread Averell
Hi Gary,

I faced a similar problem yesterday, but don't know what was the cause yet.
The situation that I observed is as follow:
 - At about 2:57, one of my EMR execution node (IP ...99) got disconnected
from YARN resource manager (on RM I could not see that node anymore),
despite that the node was still running. <<< This is another issue, but I
believe it is with YARN.
 - About 8 hours after that (between 10:00 - 11:00), I turned the
problematic EMR core node off. AWS spun up another node and added it to the
cluster to replace that. YARN RM soon recognized the new node and added it
to its list of available nodes.
However, the JM seemed to not (able to) do anything after that. It kept
trying to start the job, failed after the timeout and that "no resource
available" exception again and again. No jobmanager logs recorded since
2:57:15 though.

I am attaching the logs collected via "yarn logs --applicationId 
here. But it seems I still missed something.

I am using Flink 1.7.1, with yarn-site configuration
yarn.resourcemanager.am.max-attempts=5. Flink configurations are all of the
default values.

Thanks and best regards,
Averell flink.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/flink.log>
  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: No resource available error while testing HA

2019-01-29 Thread Averell
Hi Gary,

Thanks for the help.

Gary Yao-3 wrote
> You are writing that it takes YARN 10 minutes to restart the application
> master (AM). However, in my experiments the AM container is restarted
> within a
> few seconds when after killing the process. If in your setup YARN actually
> needs 10 minutes to restart the AM, then you could try increasing the
> number
> of retry attempts by the client [2].

I think that comes from the difference in how we tested. When I tried to
kill the JM process (using kill -9 pid) then a new process got created
within some seconds. However, when I tried to test by crashing the server
(using init 0), then it needed some time. I found the yarn-site parameter
for that timer: yarn.am.liveness-monitor.expiry-interval-ms, which is
default to 10 minutes [1]
I increased the rest client configuration as you suggested, and it did help.


Gary Yao-3 wrote
> The REST API that is queried by the Web UI returns the root cause from the
> ExecutionGraph [3]. All job status transitions should be logged together
> with
> the exception that caused the transition [4]. Check for INFO level log
> messages that start with "Job [...] switched from state" followed by a
> stacktrace. If you cannot find the exception, the problem might be rooted
> in
> your log4j or logback configuration.

Thanks. I got the point.
I am using logback. Tried to configure rolling logs, but not yet success
yet. Will need to try more.

Thanks and regards,
Averell

[1]
https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml#yarn.am.liveness-monitor.expiry-interval-ms
<https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml#yarn.am.liveness-monitor.expiry-interval-ms>
  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoints failed with NullPointerException

2019-01-29 Thread Averell
I tried to create a savepoint on HDFS, and got the same exception:


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
028e392d02bd229ed08f50a2da5227e2 failed.
at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
failed: Could not perform checkpoint 35 for operator Merge sourceA
(7/16).
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$14(JobMaster.java:970)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortWithCause(PendingCheckpoint.java:452)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:447)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1258)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failUnacknowledgedPendingCheckpointsFor(CheckpointCoordinator.java:918)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyExecutionChange(ExecutionGraph.java:1779)
at
org.apache.flink.runtime.executiongraph.ExecutionVertex.notifyStateTransition(ExecutionVertex.java:756)
at
org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1353)
at
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1113)
at
org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:945)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1576)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: java.lang.Exception:
Checkpoint failed: Could not perform checkpoint 35 for operator Merge
sourceA (7/16).
at

Checkpoints failed with NullPointerException

2019-01-29 Thread Averell
Hi everyone,

I am getting NullPointerException when the job is creating checkpoints.
My configuration is: Flink 1.7.0 running on AWS EMR, using incremental
RockDBStateBackEnd on S3. Sinks are parquet files on S3 and ElasticSearch
(I'm not sure whether sinks are relevant to this error). There had been many
successful checkpoints before it started failing.

JobManager and TaskManagers' logs showed no issue.
Here below is the extract from the "Exception" tab in Flink GUI (I don't
know from where Flink GUI collected this): 

java.lang.Exception: Could not perform checkpoint 29 for operator Merge
sourceA (2/16).
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:273)
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 29 for operator
Merge sourceA  (2/16).
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
... 8 more
Caused by: java.lang.NullPointerException
at
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.snapshotMetaData(RocksIncrementalSnapshotStrategy.java:233)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.doSnapshot(RocksIncrementalSnapshotStrategy.java:152)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.snapshot(RocksDBSnapshotStrategyBase.java:128)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:496)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
... 13 more

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: No resource available error while testing HA

2019-01-25 Thread Averell
Hi Gary,

Yes, my problem mentioned in the original post had been resolved by
correcting the zookeeper connection string.

I have two other relevant questions, if you have time, please help:

1. Regarding JM high availability, when I shut down the host having JM
running, YARN would detect that missing JM and start a new one after 10
minutes, and the Flink job would be restored. However, on the console screen
that I submitted the job, I got the following error messages: "/The program
finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException/" (full stack
trace in the attached file  flink_console_timeout.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/flink_console_timeout.log>
 
)
Is there any way to avoid this? As if I run this as an AWS EMR job, the job
would be considered failed, while it is actually be restored automatically
by YARN after 10 minutes).

2. Regarding logging, could you please help explain about the source of the
error messages show in "Exception" tab on Flink Job GUI (as per the
screenshot below). I could not find any log files has that message (not in
jobmanager.log or in taskmanager.log in EMR's hadoop-yarn logs folder).
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-01-25_at_22.png>
 

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: No resource available error while testing HA

2019-01-23 Thread Averell
Hi Gary,

Thanks for your support.

I use flink 1.7.0. I will try to test without that -n.
Here below are the JM log (on server .82) and TM log (on server .88). I'm
sorry that I missed that TM log before asking, had a thought that it would
not relevant. I just fixed the issue with connection to zookeeper and the
problem was solved.

Then I have another question: when JM cannot start/connect to the JM on .88,
why didn't it try on .82 where resource are still available? 

Thanks and regards,
Averell

Here is the JM log (from /mnt/var/log/hadoop-yarn/.../jobmanager.log on .82)
(it seems irrelevant. Even the earlier message regarding NoResourceAvailable
was there in GUI, but not found in the jobmanager.log file):

2019-01-23 04:15:01.869 [main] WARN 
org.apache.flink.configuration.Configuration  - Config uses deprecated
configuration key 'web.port' instead of proper key 'rest.port'
2019-01-23 04:15:03.483 [main] WARN 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - Upload
directory
/tmp/flink-web-08279f45-0244-4c5c-bc9b-299ac59b4068/flink-web-upload does
not exist, or has been deleted externally. Previously uploaded files are no
longer available.

And here is the TM log:
2019-01-23 11:07:07.479 [main] ERROR
o.a.flink.shaded.curator.org.apache.curator.ConnectionState  - Connection
timed out for connection string (localhost:2181) and timeout (15000) /
elapsed (56538)
org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException:
KeeperErrorCode = ConnectionLoss
at
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
at
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
at
org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.NamespaceImpl$1.call(NamespaceImpl.java:90)
at
org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java:83)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java:594)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:158)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:32)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.reset(NodeCache.java:242)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:175)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:154)
at
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.start(ZooKeeperLeaderRetrievalService.java:107)
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.start(TaskExecutor.java:277)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:168)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:332)
at
org.apache.flink.yarn.YarnTaskExecutorRunner.lambda$run$0(YarnTaskExecutorRunner.java:142)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:141)
at
org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:75)
2019-01-23 11:07:08.224 [main-SendThread(localhost:2181)] WARN 
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x0
for server null, unexpected error, closing socket connection and attempting
reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


No resource available error while testing HA

2019-01-22 Thread Averell
Hello everyone,

I am testing High Availability of Flink on YARN on an AWS EMR cluster.
My configuration is an EMR with one master-node and 3 core-nodes (each with
16 vCores). Zookeeper is running on all nodes.
Yarn session was created with: flink-yarn-session -n 2 -s 8 -jm 1024m -tm
20g
A job with parallelism of 16 was submitted.

I tried to execute the test by terminating the core-node (using Linux "init
0") having the job-manager running on. The first few restarts worked well -
a new job-manager was elected, and the job was resumed properly.
However, after some restarts, the new job-manager could not retrieve its
needed resource any more (only one TM on the node with IP .81 was shown in
the Task Managers GUI). 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Flink.png>
 

I kept getting the error message
"org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 108, slots allocated: 60".

Here below is what shown in YARN Resource Manager.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Yarn.png>
 

As per that screenshot, it looks like there are 2 tasks manager still
running (one on each host .88 and .81), which means the one on .88 has not
been cleaned properly. If it is, then how to clean it?

I wonder whether when the server with JobManager crashes, the whole job is
restarted, or a new JobManager will try to connect to the running TMs to
resume the job?


Thanks and regards,
Averell

 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Averell
Thank you Stefan, I'll try to follow your guide to debug.

And sorry for being confusing in the previous email. When I said "different
builds", I meant different versions of my application, not different builds
of Flink. 

Between versions of my application, I do add/remove some operators. However,
as I mentioned from the 1st email, I got errors when restoring savepoint
created by the same version of my application.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Averell
Hi Kostas, Stefan,

The problem doesn't come on all of my builds, so it is a little bit
difficult to track. Are there any specific classes that I can turn DEBUG on
to help in finding the problem? (Turning DEBUG on globally seems too much).
Will try to minimize the code and post it.

One more point that I notice is the error doesn't stay on one single
operator but changes from time to time (even within the same build). For
example, the previous exception I quoted was with a Window operator, while
the one below is with CoStreamFlatMap.

Thanks and best regards,
Averell

Caused by: java.lang.Exception: Exception while creating
StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for *CoStreamFlatMap*_68cd726422cf10170c4d6c7fd52ed309_(12/64)
from any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
... 5 more
Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: When does Trigger.clear() get called?

2018-10-15 Thread Averell
Thank you Fabian.

All my doubts are cleared now.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Averell
Hi everyone,

In the StreamExecutionEnvironment.createFileInput method, a file source is
created as following:
/SingleOutputStreamOperator source = 
*addSource*(monitoringFunction,
sourceName)
.*transform*("Split Reader: " + sourceName, 
typeInfo, reader);/

Does this create two different operators? If yes, then it seems impossible
to assign a UID to the 1st operator. And might it be the cause for my
problem?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: When does Trigger.clear() get called?

2018-10-13 Thread Averell
Hello Hequn,

Thanks for the answers.
Regarding question no.2, I am now clear.
Regarding question no.1, does your answer apply to those custom states as
well? This concern of mine came from Flink's implementation of CountTrigger,
in which a custom state is being cleared explicitly in Trigger.clear():

/   public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
/

My 3rd question was for ordinary, non-windowed keyed streams, where I don't
see in Flink's document any mention of using Trigger, so how can I clear
those streams?

Thank you very much for your help.
Regards,
Averell
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: When does Trigger.clear() get called?

2018-10-13 Thread Averell
Hello Fabian,

So could I assume the followings?

1. Neither PURGE nor clear() removes the States (so the States must be
explicitly cleared by the user).
2. When an event for a window arrives after PURGE has been called, it is
still be processed, and is treated as the first event of that window.

And one related question: for keyed streams, if I know that some keys would
never have new events anymore, should/could I remove those streams
corresponding to those keys so that I can save some memory allocated to the
metadata?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Identifying missing events in keyed streams

2018-10-13 Thread Averell
Thank you Fabian.

Tried (2), and it's working well.
I found one more benefit of (2) over (3) is that it allow me to easily raise
multiple levels of alarms for each keyed stream (i.e: minor: missed 2
cycles, major: missed 5 cycles,...)

Thanks for your help.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Identifying missing events in keyed streams

2018-10-11 Thread Averell
Hi Fabian,

Thanks for the suggestion.
I will try with that support of removing timers.

I have also tried approach (3) - using session windows, and it works: I set
session gap to 2 minutes, and use an aggregation window function to keep the
amount of in-memory data for each keyed stream to the minimum level.

Could you please explain why (2) is better?

Thanks and best regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

2018-10-11 Thread Averell
Hi Kostas,

Thanks for the info. That error caused by I built your code along with not
up-to-date baseline. I rebased my branch build, and there's no more such
issue.
I've been testing, and until now have some questions/issues as below:

1. I'm not able to write to S3 with the following URI format: *s3*://,
and had to use *s3a*://. Is this behaviour expected? (I am running
Flink on AWS EMR, and I thought that EMR provides a wrapper for HDFS over S3
with something called EMRFS).

2. Occasionally/randomly I got the below message ( parquet_error1.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/parquet_error1.log>
 
). I'm using ParquetAvroWriters.forReflectRecord() method to write Scala
case classes. Re-running the job doesn't get that error at the same data
location, so I don't think that there's issue with data.
 *java.lang.ArrayIndexOutOfBoundsException: * /at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.fallBackDictionaryEncodedData/.
 

3. Sometimes I got this error message when I use parallelism of 8 for the
sink ( parquet_error2.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/parquet_error2.log>
 
).
Reducing to 2 solves the issue. But is it possible to increase the pool
size? I could not find any place that I can change the
/fs.s3.maxconnections/ parameter.
/java.io.InterruptedIOException: initiate MultiPartUpload on
Test/output/dt=2018-09-20/part-7-5:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable
to execute HTTP request: Timeout waiting for connection from pool/

4. Where is the temporary folder that you store the parquet file before
uploading to S3?

Thanks a lot for your help.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Averell
Hi Kostas,

No, the same code was used.
I (1) started the job, (2) created a savepoint, (3) cancelled the job, (4)
restored the job with the same command as in (1) with the addition "-s
".

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Averell
Hi Kostas,

Yes, I modified ContinuousFileMonitoringFunction to add one more
ListState. The error might/should have come from that, but I haven't
been able to find out why.

All of my keyed streams are defined by Scala tuples like: /ikeyBy(r =>
(r.customer_id, r.address))/, and the fields using as keys are of types
either String or Long. For this, I don't have to define equals and hashcode
method, do I?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Averell
Hi Stefan, Dawid,

I hadn't changed anything in the configuration. Env's parallelism stayed at
64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink
1.7-SNAPSHOT, with the code pulled from the master branch about 5 days back.
Savepoint was saved to either S3 or HDFS (I tried multiple times), and had
not been moved.

Is there any kind of improper user code can cause such error?

Thanks for your support.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Averell
Hi everyone,

I'm getting the following error when trying to restore from a savepoint.
Here below is the output from flink bin, and in the attachment is a TM log.
I didn't have any change in the app before and after savepoint. All Window
operators have been assigned unique ID string.

Could you please help give a look?

Thanks and best regards,
Averell

taskmanager.gz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/taskmanager.gz>
  

org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 606ad5239f5e23cedb85d3e75bf76463)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
at
com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
at
com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 22 more
Caused by: java.lang.Exception: Exception while creating
StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64)
from any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
... 5 more
Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.ja

Re: Streaming to Parquet Files in HDFS

2018-10-07 Thread Averell
Hi Kostas,

I'm using a build with your PR. However, it seemed the issue is not with S3,
as when I tried to write to local file system (file:///, not HDFS), I also
got the same problem - only the first part published. All remaining parts
were in inprogress and had names prefixed with "."

>From Flink GUI, all checkpoints were shown as completed successfully. 

How could I debug further?

Thanks a lot for your help.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

2018-10-06 Thread Averell
Hi Kostas,

Please help ignore my previous email about the issue with security. It seems
to I had mixed version of shaded and unshaded jars.

However, I'm now facing another issue with writing parquet files: only the
first part is closed. All the subsequent parts are kept in in-progress state
forever. My settings are to have checkpoint every 3 minutes. Sink
parallelism set to 1 (my tries to set to 4 or 30 showed no difference).
BucketID assigner is using event-timestamp.
I only got this issue when running Flink on a yarn cluster, either writing
to file:/// or to S3. When I ran it on my laptop, I got one part for every
single checkpoint.
TM logs says something like "*BucketState ... has pending files for
checkpoints: {2 }*"

Could you please help on how can I further debug this?

Here below is the TM log:

2018-10-06 14:39:01.197 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(153765630,meter0219838,R1.S1.LT1.P25).
2018-10-06 14:39:01.197 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(153765630,meter0219838,R1.S1.LT1.P25).
2018-10-06 14:39:01.984 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-0" for bucket id=dt=2018-09-22.
2018-10-06 14:39:01.984 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-0" for bucket id=dt=2018-09-22.
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=1).
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=1).
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:40:18.254 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {2 }
2018-10-06 14:40:18.254 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {2 }
2018-10-06 14:40:44.069 [Async calls on Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
received completion notification for checkpoint with id=2.
2018-10-06 14:40:44.069 [Async calls on Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
received completion notification for checkpoint with id=2.
2018-10-06 14:40:46.691 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(153765630,meter0207081,R1.S1.LT1.P25).
2018-10-06 14:40:46.691 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(153765630,meter0207081,R1.S1.LT1.P25).
2018-10-06 14:40:46.765 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-1" for bucket id=dt=2018-09-22.
2018-10-06 14:40:46.765 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-1" for bucket id=dt=2018-09-22.
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=2).
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=2).
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing

Re: Utilising EMR's master node

2018-10-06 Thread Averell
Hi Gary,

Thanks for the information. I didn't know that -yn is obsolete :( I am using
Flink 1.6.
Not sure whether that's a bug when I tried to set -yn explicitly, but I
started only 1 cluster.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


"unable to establish the security context" with shaded Hadoop S3

2018-10-05 Thread Averell
Hi everyone,

I'm trying a built after this  PR 6795
<https://github.com/apache/flink/pull/6795>   for S3 Recoverable writer, to
write my stream into parquet file on S3 with Flink running on AWS EMR; and
get the error "unable to establish the security context" with full
stacktrace below.
The shading of hadoop jars started from this ticket  FLINK-10366
<https://issues.apache.org/jira/browse/FLINK-10366>  . Googling the error
didn't help. Could someone please help me? 

Thanks and best regards,
Averell

/Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
set.
java.lang.Exception: *unable to establish the security context*
at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class
*org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider*
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2503)
at org.apache.hadoop.security.Groups.(Groups.java:106)
at org.apache.hadoop.security.Groups.(Groups.java:101)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:448)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:331)
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:359)
at
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
... 1 more
Caused by: java.lang.RuntimeException: class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2497)
... 8 more
/




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

2018-10-05 Thread Averell
Hi Kostas,

I tried your PR - trying to write to S3 from Flink running on AWS, and I got
the following error. I copied the three jar files
flink-hadoop-fs-1.7-SNAPSHOT.jar, flink-s3-fs-base-1.7-SNAPSHOT.jar,
flink-s3-fs-hadoop-1.7-SNAPSHOT.jar to lib/ directory. Do I need to make any
change to HADOOP configurations?

Thanks and best regards,
Averell

java.lang.Exception: unable to establish the security context
at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1118)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2503)
at org.apache.hadoop.security.Groups.(Groups.java:106)
at org.apache.hadoop.security.Groups.(Groups.java:101)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:448)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:331)
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:359)
at
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
... 1 more
Caused by: java.lang.RuntimeException: class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2497)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

2018-10-05 Thread Averell
What a great news.
Thanks for that, Kostas.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


  1   2   >