JSON to Parquet

2020-08-20 Thread Averell
Hello,

I have a stream with each message is a JSON string with a quite complex
schema (multiple fields, multiple nested layers), and I need to write that
into parquet files after some slight modifications/enrichment.

I wonder what options are available for me to do that. I'm thinking of JSON
-> AVRO (GenericRecord) -> Parquet. Is that an option? I would want to be
able to quickly/dynamically (as less code change as possible) change the
JSON schema.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-08-27 Thread Averell
Hello,

I have a Flink 1.10 job which runs in AWS EMR, checkpointing to S3a as well
as writing output to S3a using StreamingFileSink. The job runs well until I
add the Java Hadoop properties:  /-Dfs.s3a.acl.default=
BucketOwnerFullControl/. Since after that, the checkpoint process fails to
complete.

/Caused by: org.xml.sax.SAXException: SAX2 driver class
org.apache.xerces.parsers.SAXParser not found/
I tried to add a jar file with that class
(https://mvnrepository.com/artifact/xerces/xercesImpl/2.12.0) to my
flink/lib/ directory, then got the same error but different stacktrace:
/Caused by: org.apache.flink.util.SerializedThrowable: SAX2 driver class
org.apache.xerces.parsers.SAXParser not found/

This seems to be a dependencies conflict, but I couldn't track its root.
In my IDE I didn't have any dependencies issue, while I couldn't find
SAXParser in the dependencies tree.

*Here is the stacktrace when the jar file is not there:*
/Caused by: org.apache.hadoop.fs.s3a.AWSClientIOException: getFileStatus on
s3a://mybucket/checkpoint/a9502b1c81ced10dfcbb21ac43f03e61/chk-2/41f51c24-60fd-474b-9f89-3d65d87037c7:
com.amazonaws.SdkClientException: Couldn't initialize a SAX driver to create
an XMLReader: Couldn't initialize a SAX driver to create an XMLReader
at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2251)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:749)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:164)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
at
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
... 17 more
Caused by: com.amazonaws.SdkClientException: Couldn't initialize a SAX
driver to create an XMLReader
at
com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.(XmlResponsesSaxParser.java:118)
at
com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:87)
at
com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
at
com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
at
com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
at
com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1554)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1272)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4266)
at
com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:876)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$5(S3AFileSystem.java:1262)
at
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoke

Re: JSON to Parquet

2020-08-27 Thread Averell
Hi Dawid,

Thanks for the suggestion. So, basically I'll need to use the JSON connector
to get the JSON strings into Rows, and from Rows to Parquet records using
the parquet connecter?
I have never tried the TableAPI in the past, have been using the
StreamingAPI only. Will follow your suggestion now.

Thanks for your help.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-08-31 Thread Averell
Hello Robert, Arvid,

As I am running on EMR, and currently AWS only supports version 1.10.
I tried both solutions that you suggested ((i) copying a SAXParser
implementation to the plugins folder and (ii) using the S3FS Plugin from
1.10.1), and both worked - I could have successful checkpoints.

However, intermittenly my checkpoints still fail (about 10%). And whenever
it fails, there are non-completed files left in S3 (attached screenshot
below).
I'm not sure whether those uncompleted files are expected, or is that a bug?

Thanks and regards,
Averell
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2020-08-28_at_11.png>
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-09-12 Thread Averell
Hello Robert,

I'm not sure why the screenshot I attached in the previous post was not
shown. I'm trying to re-attach in this post. 
As shown in this screenshot, part-1-33, part-1-34, and part-1-35 have
already been closed, but the temp file for part-1-33 is still there.

Thanks and regards
Averell 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/FlinkFileSink.png>
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


HA on AWS EMR

2020-10-18 Thread Averell
Hi,

I'm trying to enable HA for my Flink jobs running on AWS EMR.
Following [1], I created a common Flink YARN session and submitting all my
jobs to that one. These 4 config params were added
/high-availability = zookeeper
high-availability.storageDir =  
high-availability.zookepper.path.root = /flink
high-availability.zookeeper.quorum = :2181
/(The Zookeeper came with EMR was used)

The command to start that Flink YARN session is like this:
`/flink-yarn-session -Dtaskmanager.memory.process.size=4g -nm
FlinkCommonSession -z FlinkCommonSession -d/`

The first HA test - yarn application killed - went well. I killed that
common session by using `/yarn application --kill /` and created a
new session using the same command, then the jobs were restored
automatically after that session was up.

However, the 2nd HA test - EMR cluster crashed - didn't work: the */jobs are
not restored/ *after the common session was created on the new EMR cluster.
(attached  jobmanager.gz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/jobmanager.gz>
 
)

Should I expect that the jobs are restored in that scenario no.2 - EMR
cluster crashed.
Do I miss something here?

Thanks for your help.

Regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: HA on AWS EMR

2020-10-19 Thread Averell
Hello Roman,

Thanks for your time.
I'm using EMR 5.30.1 (Flink 1.10.0) with 1 master node.
/yarn.application-attempts/ is not set (does that means unlimited?), while 
/yarn.resourcemanager.am.max-attempts/ is 4.

In saying "EMR cluster crashed) I meant the cluster is lost. Some scenarios
which could lead to this are:
  - The master node is down
  - The cluster is accidentally / deliberately terminated.

I found a thread in our mailing list [1], in which Fabian mentioned a
/"pointer"/ stored in Zookeeper. It looks like this piece of information is
stored in Zookeeper's dataDir, which is by default stored in the local
storage of the EMR's master node. I'm trying to move this one to an EFS, in
hope that it would help. Not sure whether this is a right approach.

Thanks for your help.
Regards,
Averell


[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/HA-and-zookeeper-tp27093p27119.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: HA on AWS EMR

2020-10-21 Thread Averell
Hello Roman,

Thanks for the answer.
I have already had that high-availability.storageDir configured to an S3
location. Our service is not critical enough, so to save the cost, we are
using the single-master EMR setup. I understand that we'll not get YARN HA
in that case, but what I expect here is the ability to quickly restore the
service in the case of failure. Without Zookeeper, when such failure
happens, I'll need to find the last checkpoint of each job and restore from
there. With the help of HA feature, I can just start a new
flink-yarn-session, then all jobs will be restored.

I tried to change zookeeper dataDir config to an EFS location which both the
old and new EMR clusters could access, and that worked.

However, now I have a new question: is it expectable to restore to a new
version of Flink (e.g. saved with Flink1.10 and restored to Flink1.11)? I
tried and got some error messages attached below. Not sure that's a bug or
expected behaviour.

Thanks and best regards,
Averell


/07:39:33.906 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed
07:40:11.585 [flink-akka.actor.default-dispatcher-2] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred
in the cluster entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Could not start
recovered job 6e5c12f1c352dd4e6200c40aebb65745.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:222)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_265]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.client.JobExecutionException: Could not instantiate
JobManager.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(

KryoException UnsupportedOperationException when writing Avro GenericRecords to Parquet

2020-10-21 Thread Averell
Hi,

I'm trying to convert a stream of JSON string to a stream of Avro
GenericRecords, and write this to parquet files, but I get the exception.
This exception came at the line /out.collect(genericRecord)/. If there's no
sink then there's no error.
/KryoException: java.lang.UnsupportedOperationException/

My code is as following:
/val parquetSink: StreamingFileSink[GenericRecord] =
StreamingFileSink
  .forBulkFormat(new Path(path),
  ParquetAvroWriters.forGenericRecord(new
Schema.Parser().parse(schemaString)))
  .build()


val parquetStream = inputStream.process(new ProcessFunction[String,
GenericRecord] {
@transient
private var schema: Schema = _
@transient
private var reader: GenericDatumReader[GenericRecord] = _

override def processElement(value: String,
ctx: ProcessFunction[String,
GenericRecord]#Context,
out: Collector[GenericRecord]): Unit
= {
if (reader == null) {
schema = new Schema.Parser().parse(schemaString)
reader = new GenericDatumReader[GenericRecord](schema)
}
try {
val genericRecord = reader.read(null,
DecoderFactory.get.jsonDecoder(schema, value))
out.collect(genericRecord)
} catch {
case e: Throwable =>
LOG.warn(s"Error decoding JSON string: $e\nRaw
string: `${value.value}`")
throw e
}
}
})
parquetStream.addSink(parquetSink)
/

The schema is a simple one with all fields are string.
I tried with both Flink 1.10.0 and 1.11.0, and currently stuck at this.
Could you please help?

Thanks and regards,
Averell



/com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:53)
at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:44)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxPr

Re: KryoException UnsupportedOperationException when writing Avro GenericRecords to Parquet

2020-10-26 Thread Averell
Hello Till,

Adding GenericRecordAvroTypeInfo(schema) does help.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: HA on AWS EMR

2020-10-27 Thread Averell
Hello Robert,

Thanks for the info. That makes sense. I will save and cancel my jobs with
1.10, upgrade to 1.11, and restore the jobs from the savepoints.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Job is still in ACTIVE state after /jobs/:jobid/stop

2020-11-12 Thread Averell
Hi,

I'm on 1.11.0, with a streaming job running on a YARN session, reading from
Kinesis.
I tried to stop the job using REST, with "drain=false". After that POST
request, I got back a request_id (not sure how should I use that for).

Checked the job in GUI, I could see that a savepoint has been completed
successfully (my job has next to zero states, so that was very quick). The
watermark stopped increasing, and no more checkpoints after that. However,
the job's status is still ACTIVE.

Querying job details via REST showed that the job is not stoppable (I guess
this is misleading information), and the /timestamps.RUNNING/ is not
increasing.

/  "name": "MyFlinkJob",
  "isStoppable": false,
  "state": "RUNNING",
  "start-time": 1604016319260,
  "end-time": -1,
  "duration": 1166337471,
  "now": 1605182656731,
  "timestamps": {
"CANCELLING": 0,
"FAILING": 0,
"CANCELED": 0,
"FINISHED": 0,
"RUNNING": 1604016319495,
"FAILED": 0,
"RESTARTING": 0,
"CREATED": 1604016319260,
"RECONCILING": 0,
"SUSPENDED": 0
  },
/

Is this a known bug? Or is it an expected behaviour?
Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Job is still in ACTIVE state after /jobs/:jobid/stop

2020-11-12 Thread Averell
I have some updates. Some weird behaviours were found. Please refer to the
attached photo.

All requests were sent via REST API

The status of the savepoint triggered by that stop request (ID 11018) is
"COMPLETED [Savepoint]", however, no checkpoint data has been persisted (in
S3).
The folder /`savepoint-5871af-c0f2d2334501/_metadata/`/ has been created in
S3, but no files in that.
This was the command I used to send the first stop request:
/curl -s -d '{"drain": false,
"targetDirectory":"*s3*://mybucket/savepoint"}' -H 'Content-Type:
application/json' -X POST
http://myip:45507/jobs/5871af88ff279f30ebcc49ce741c2d75/stop/

Suspected that /s3:/// might be the issue, I tried to send another stop
request (ID 11020), mistakenly having the path as /s3*s*:///. So it failed.

Another stop request was sent (ID 11021). This one failed after timeout (10
minutes). The GUI says the checkpoint failed with /`Checkpoint expired
before completing`/.
/curl -s -d '{"drain": false,
"targetDirectory":"s3*a*://mybucket/savepoint"}' -H 'Content-Type:
application/json' -X POST
http://myip:45507/jobs/5871af88ff279f30ebcc49ce741c2d75/stop/

I tried to send a create-savepoint request (ID 11023), and this time, it
completed successfully, with files persisted to S3. Checking Flink GUI I
could see that the job actually resumed before that savepoint request (with
the checkpoint ID 11021 created just 30 seconds after 11021 expired).
/curl -s -d '{"target-directory":"s3a://mybucket/savepoint", "cancel-job":
false}' -H 'Content-Type: application/json' -X POST
http://myip:45507/jobs/5871af88ff279f30ebcc49ce741c2d75/savepoints
/


 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi,

I have the following code:
 /   StreamingFileSink
  .forRowFormat(new Path(path), myEncoder)
  .withRollingPolicy(DefaultRollingPolicy.create().build())
  .withBucketAssigner(myBucketAssigner)
  .build()/
This is working fine in Flink 1.8.3. However, when I try to compile with
Flink 1.10.0, I got the following error:
/ value build is not a member of ?0
possible cause: maybe a semicolon is missing before `value build'?/

As per the hint from IntelliJ,
/.forRowFormat returns a RowFormatBuilder[_ <: RowFormatBuilder[_]]
.withRollingPolicy(...) returns a RowFormatBuilder[_]
.withBucketAssigner(...) returns Any/

I'm using Maven 3.6.0, Java 1.8.0_242, and Scala 2.11.12. Tried with/without
IntelliJ, no difference.

Not sure/understand what's wrong

Thanks!
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi Sivaprasanna,

That is a compile-time error, not a runtime error.

/value build is not a member of ?0
possible cause: maybe a semicolon is missing before `value build'?/. 

There won't be any issue with either *withRollingPolicy*() or
/withBucketAssigner/(), but not both.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi,

I tried to add the following cast, and it works. Doesn't look nice though

/StreamingFileSink
  .forRowFormat(new Path(path), myEncoder)
  .withRollingPolicy(DefaultRollingPolicy.create().build())
 
.withBucketAssigner(myBucketAssigner)*.asInstanceOf[RowFormatBuilder[IN,
String, _]]*
  .build()/

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Change to StreamingFileSink in Flink 1.10

2020-04-21 Thread Averell
Hello Leonard, Sivaprasanna,

But my code was working fine with Flink v1.8.
I also tried with a simple String DataStream, and got the same error.
/StreamingFileSink
  .forRowFormat(new Path(path), new SimpleStringEncoder[String]())
  .withRollingPolicy(DefaultRollingPolicy.builder().build())
  .withBucketAssigner(new DateTimeBucketAssigner)
  .build()/
(screenshot below)

 

It's weird. At first I thought it's something wrong with IntelliJ, but I got
the same error when running mvn from commandline.






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Change to StreamingFileSink in Flink 1.10

2020-04-22 Thread Averell
Thanks @Seth Wiesman and all.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


K8s native - checkpointing to S3 with RockDBStateBackend

2020-04-23 Thread Averell
Hi,
I am trying to deploy my job to Kubernetes following the native-Kubernetes
guide. My job is checkpointing to S3 with RockDBStateBackend. It also has a
S3 StreamingFileSink.
In my jar file, I've already had /flink-hadoop-fs,
flink-connector-filesystem, flink-s3-fs-hadoop /(as my understanding, these
are for the S3 sink, please correct me if I'm wrong)

When I tried to submit the job, I got the following error (only a few
seconds after submitting): /Could not find a file system implementation for
scheme 's3'. The scheme is not directly supported by Flink and no Hadoop
file system to support this scheme could be loaded/

Not sure how I can get over this. 
Using s3a didn't help (s3 does work well when running on my dev machine)
I also tried to copy the file /flink-shaded-hadoop-2-uber-2.8.3-10.0.jar/ to
the //opt/flink/lib// folder of the JobManager pod, but it didn't help (is
it already too late? should that be there before the JM is started?)

Thanks for your help.
Averell


/
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create
checkpoint storage at checkpoint coordinator side.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:282)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:205)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
at
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:215)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:120)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:266)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3a'. The scheme is
not directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:64)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:490)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:477)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:279)
... 23 more/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about Scala Case Class and List in Flink

2020-04-24 Thread Averell
Hi Timo,

This is my case class:
/case class Box[T](meta: Metadata, value: T) {
  def function1: A=>B = {...}
  def method2(...):A = {...}
}/

However, I still get that warning "/Class class data.package$Box cannot be
used as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on "Data
Types & Serialization" for details of the effect on performance./"

I imported /org.apache.flink.streaming.api.scala._/ << is this enough to
tell that I am using Scala API?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: K8s native - checkpointing to S3 with RockDBStateBackend

2020-04-24 Thread Averell
Thank you Yun Tang.
Building my own docker image as suggested solved my problem.

However, I don't understand why I need that while I already had that
s3-hadoop jar included in my uber jar?

Thanks.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: K8s native - checkpointing to S3 with RockDBStateBackend

2020-04-26 Thread Averell
Hi David, Yang,

Thanks. But I just tried to submit the same job on a YARN cluster using that
same uberjar, and it was successful. I don't have flink-s3-fs-hadoop.jar
anywhere in the lib or plugin folder.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Metrics in kubernetes

2020-05-12 Thread Averell
Hi,

I'm trying to config Flink running in Kubernetes native to push some metrics
to NewRelic (using a custom ScheduledDropwizardReporter).

>From the logs, I could see that an instance of ScheduledDropwizardReporter
has already been created successfully (the overridden  getReporter() method
<https://github.com/apache/flink/blob/e346215edcf2252cc60c5cef507ea77ce2ac9aca/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java#L234>
  
was called).
An instance of  MetricRegistryImpl
<https://github.com/apache/flink/blob/e346215edcf2252cc60c5cef507ea77ce2ac9aca/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java#L141>
  
also created successfully (this log was shown: /Periodically reporting
metrics in intervals of 30 SECONDS for reporter my_newrelic_reporter/)

However, the  report() method
<https://github.com/apache/flink/blob/e346215edcf2252cc60c5cef507ea77ce2ac9aca/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java#L30>
  
was not called.

When running on my laptop, there's no issue at all.
Are there any special things that I need to care for when running in
Kubernetes?

Thanks a lot.

Regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Metrics in kubernetes

2020-05-12 Thread Averell
Hi Gary,

Thanks for the help.
Here below is the output from jstack. It seems not being blocked. 



In my JobManager log, there's this WARN, I am not sure whether it's relevant
at all.


Attached is the full jstack dump  k8xDump.txt
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/k8xDump.txt>
 
.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Metrics in kubernetes

2020-05-13 Thread Averell
Hi Gary,

Sorry for the false alarm. It's caused by a bug in my deployment - no
metrics were added into the registry.
Sorry for wasting your time.

Thanks and best regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Automatically resuming failed jobs in K8s

2020-06-10 Thread Averell
Hi,
I'm running some jobs using native Kubernetes. Sometimes, for some unrelated
issue with our K8s cluster (e.g: K8s node crashed), my Flink pods are gone.
The JM pod, as it is deployed using a deployment, will be re-created
automatically. However, all of my jobs are lost.
What I have to do now are:
1. Re-upload the jars
2. Find the path to the last checkpoint of each job
3. Resubmit the job

Is there any existing option to automate those steps? E.g.
1. Can I use a jar file stored in the JM's file system or on S3 instead of
uploading the jar file via REST interface?
2. When restoring the job, I need to provide the full path of the last
checkpoint (/s3:chk-2345//). Is there any option
to just provide the base_path?
3. Store the info to restore the jobs in the K8s deployment config

Thanks a lot.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Automatically resuming failed jobs in K8s

2020-06-12 Thread Averell
Thank you very much, Yang.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Preserving (best effort) messages order between operators

2019-10-30 Thread Averell
Hi, 

I have a source function with parallelism = 1, sending out records ordered
by event-time. These records are then re-balanced to the next operator which
has parallelism > 1. I observed that within each subtask of the 2nd
operator, the order of the messages is not maintained. Is this behaviour
expected? If it is, is there any way to avoid that? Or at least reduce that?
I have high back-pressure on that 2nd operator as the one after that is
slow. There is also high back-pressure on the 1st operator, which makes my
problem more severe (the mentioned out-of-order is high). If I could
throttle the 1st operator when back-pressure is high, then I could mitigate
the mentioned problem. But I could not find any guide on doing that.

Could you please help?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Averell
Hi Kant,

I wonder why you need to "source" your intermediate state from files? Why
not "source" it from the previous operator? I.e. instead of (A join B) ->
State -> files -> (C), why not do (A join B) -> State -> (files + C)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Averell
Hi Kant,

Not sure about what you meant in "query it using SQL"? Do you mean running
ad-hoc SQL queries on that joined data? If that's what you meant, then
you'll need some SQL server first, then write the joined data to that SQL
server. ElasticSearch and Cassandra are ready-to-use options. Writing a
custom sink function to write to your own SQL server is also a
not-so-difficult solution.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Preserving (best effort) messages order between operators

2019-11-01 Thread Averell
Hi Yun,

I found the cause of the issue.
That ContinuousFileReaderOperator (my operator B) is using a PriorityQueue
which maintains a buffer sorted by modTime, thus my records were re-ordered.
I don't understand the reason behind using PriorityQueue instead of an
ordinary Queue though.

Thanks.
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


S3 file source - continuous monitoring - many files missed

2018-07-23 Thread Averell
Good day everyone,

I have a Flink job that has an S3 folder as a source, and we keep putting
thousands of small (around 1KB each) gzip files into that folder, with the
rate of about 5000 files per minute. Here is how I created that source in
Scala:

   / val my_input_format = new TextInputFormat(new
org.apache.flink.core.fs.Path(my_path))
my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
my_input_format.setNestedFileEnumeration(true)

val my_raw_stream = streamEnv
.readFile(my_input_format,
my_path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000)
/
The problem is, with the monitoring interval of 1,000ms as above, about 20%
of the files were missed. From Apache Flink Dashboard, at the subsequent
operators, I could only see ~80% of the total number of files recorded
("Records sent" column).

If I increase the monitoring interval, the number of missed files would
reduce. At 5,000ms, it is about 10%, and at 30,000ms, only about 2% missed.

No WARNING/ERROR recorded though.

I could not simulate this in HDFS, as I could not reach that high file
writing speed in our cluster.

Could someone please help. Thank you very much.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Hi Jörn,
Thanks. I had missed that EMRFS strong consistency configuration. Will try
that now.
We also had a backup solution - using Kinesis instead of S3 (I don't see
Kinesis in your suggestion, but hope that it would be alright).

"/The small size and high rate is not suitable for S3 or HDFS/" <<<
regarding this, is there any guidelines on how big the file size should be
before we should consider S3/HDFS?

Thanks a lot. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Just some update: I tried to enable "EMRFS Consistent View" option, but it
didn't help. Not sure whether that's what you recommended, or something
else.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Could you please help explain more details on "/try read after write
consistency (assuming the files are not modified) /"? 
I guess that the problem I got comes from the inconsistency in S3 files
listing. Otherwise, I would have got exceptions on file not found.

My use case is to read output files from another system. That system was
built some years back, and is outputting files to their S3 bucket. There is
no file modification, only new files are being created. We want to avoid
modifying that system.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Hello Jörn.

Thanks for your help.
"/Probably the system is putting them to the folder and Flink is triggered
before they are consistent./" <<< yes, I also guess so. However, if Flink is
triggered before they are consistent, either (a) there should be some error
messages, or (b) Flink should be able to identify those files in the
subsequent triggers. But in my case, those files are missed forever.

Right now those files for S3 are to be consumed by Flink only. The flow is
as follow:
   Existing system >>> S3 >>> Flink >>> Elastic Search.
If I cannot find a solution to the mentioned problem, I might need to change
to:
   Existing system >>> Kinesis >>> Flink >>> Elastic Search
Or
   Existing system >>> S3 >>> Kinesis >>> Flink >>> Elastic Search  
Or
   Existing system >>> S3 >>> Custom File Source + Flink >>> Elastic
Search
However, all those solutions would take much more effort.

Thanks!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Thank you Fabian.

I tried to implement a quick test basing on what you suggested: having an
offset from system time, and I did get improvement: with offset = 500ms -
the problem has completely gone.  With offset = 50ms, I still got around 3-5
files missed out of 10,000. This number might come from the difference
between clocks of the EC2 instance and S3.

I Will now try to implement exactly what you suggested, and open a Jira
issue as well.

Thanks for your help.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Hello Fabian,

I created the JIRA bug https://issues.apache.org/jira/browse/FLINK-9940
BTW, I have one more question: Is it worth to checkpoint that list of
processed files? Does the current implementation of file-source guarantee
exactly-once? 

Thanks for your support.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-25 Thread Averell
Thank you Fabian for the guide to implement the fix.

I'm not quite clear about the best practice of creating JIRA ticket. I
modified its priority to Major as you said that it is important.
What would happen next with that issue then? Someone (anyone) will pick it
and create a fix, then include that in the following release?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-30 Thread Averell
Here is my 
https://github.com/lvhuyen/flink implementation
   of the change. 3 files were updated:
StreamExecutionEnvironment.java, StreamExecutionEnvironment.scala, and
ContinuousFileMonitoringFunction.java.

All the thanks to Fabian.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Hi everyone,

We are collecting log files from tens of thousands of network nodes, and we
need to do some data insights using that. The files are coming with the
corresponding node ID in the file name, and I want to do custom partitioning
using that Node ID.
Right now (with Flink 1.5) I think that is not supported. I have been trying
to look into the code, but it would take some time for me to understand.
>From the GUI, it looks like the first step of file source (directory
monitoring) is rebalancing the stream to the 2nd step (file reader). And as
per Flink document, rebalancing means round-robin. However, I could not find
the call of "rebalancing" method, but "transform" is called. Not much
information about that "transform" method though.

Would it possible for me to ask for some guideline on this?

Thanks for your help.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Thank you Vino.

Yes, I went thru that official guide before posting this question. The
problem was that I could not see any call to one of those mentioned
partitioning methods (partitionCustom, shuffle, rebalance, rescale, or
broadcast) in the original readFile function. I'm still trying to look into
the code.
There should always be a way to do it, but I hope that you / someone can
help me with the easiest way - kind of a small customization at the very
place that "directory monitoring" hands-over the file splits to "file
reader".

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Thanks Vino.

Yes, I can do that after the source function. But that means data would be
shuffled - sending from every source to the right partition.
I think that by doing the partition from within the file source would help
to save that shuffling.

Thanks.
Averell.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Oh, Thank you Vino. I was not aware of that reshuffling after every custom
partitioning. Why would that needed though?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Hi Vino,

I'm a little bit confused. 
If I can do the partitioning from within the source function, using the same
hash function on the key to identify the partition, would that be sufficient
to avoid shuffling in the next byKey call?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Detect late data in processing time

2018-07-30 Thread Averell
Hi Soheil,

Why don't you just use the processing time as event time? Simply overriding
extractTimestamp to return your processing time.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-07-31 Thread Averell
Hi Fabian,

Thanks for the information. I will try to look at the change to that complex
logic that you mentioned when I have time. That would save one more shuffle
(from 1 to 0), wouldn't that?

BTW, regarding fault tolerant in the file reader task, could you help
explain what would happen if the reader task crash in the middle of reading
one split? E.g: the split has 100 lines, and the reader crashed after
reading 30 lines. What would happen when the operator gets resumed? Would
those first 30 lines get reprocessed the 2nd time?

Those tens of thousands of files that I have are currently not in CSV
format. Each file has some heading session of 10-20 lines (common data for
the node), then data session with one CSV line for each record, then again
some common data, and finally, a 2nd data session - one CSV line for each
record.
My current solution is to write a non-Flink job to preprocess those files
and bring them to standard CSV format to be the input for Flink.

I am thinking of doing this in Flink, with a custom file reader function
which works in a similar way to wholeTextFile function in Spark batch
processing. However, I don't know how to have fault tolerance in doing that
yet.

Thank you very much for your support.

Regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-08-07 Thread Averell
Thank you Fabian.
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./" 
I do not quite understand this statement. If I have read 30 lines from the
checkpoint and sent those 30 records to the next operator, then when the
streaming is recovered - resumed from the last checkpoint, the subsequent
operator would receive those 30 lines again, am I right?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-08-09 Thread Averell
Thank you Vino and Fabien for your help in answering my questions. As my
files are small, I think there would not be much benefit in checkpointing
file offset state.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-08-09 Thread Averell
Hi Fabian, Vino,

I have one more question, which I initially planned to create a new thread,
but now I think it is better to ask here:
I need to process one big tar.gz file which contains multiple small gz
files. What is the best way to do this? I am thinking of having one single
thread process that read the TarArchiveStream (which has been decompressed
from that tar.gz by Flink automatically), and then distribute the
TarArchiveEntry entries to a multi-thread operator which would process the
small files in parallel. If this is feasible, which elements from Flink I
can reuse?

Thanks a lot.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Averell
Thank you Vino, Jorn, and Fabian.
Please forgive me for my ignorant, as I am still not able to fully
understand state/checkpointing and the statement that Fabian gave earlier:
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./"

My current understanding is: checkpointing is managed at the
Execution-Environment level, and it would happen at the same time at all the
operators of the pipeline. Is this true?
My concern here is how to manage that synchronization? It would be quite
possible that at different operators, checkpointing happens at some
milliseconds apart, which would lead to duplicated or missed records,
wouldn't it?

I tried to read Flink's document about managing State  here
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html>
 
. However, I have not been able to find the information I am looking for.
Please help point me to the right place.

Thanks and best regards,
Averell.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-08-12 Thread Averell
Thank you Fabian.
It is clear to me now. Thanks a lot for your help.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


CoFlatMapFunction with more than two input streams

2018-08-14 Thread Averell
Hi,

I have stream_A of type "Dog", which needs to be transformed using data from
stream_C of type "Name_Mapping". As stream_C is a slow one (data is not
being updated frequently), to do the transformation I connect two streams,
do a keyBy, and then use a RichCoFlatMapFunction in which mapping data from
stream_C is saved into a State (flatMap1 generates 1 output, while flatMap2
is just to update State table, not generating any output).

Now I have another stream B of type "Cat", which also needs to be
transformed using data from stream_C. After that transformation,
transformed_B will go through a completely different pipeline from
transformed A. 

I can see two approaches for this:
1. duplicate stream_C and the RichCoFlatMapFunction and apply on stream_B
2. create a new stream D of type "Animal", transform it with C, then split
the result into two streams using split/select using case class pattern
matching.

My question is which option should I choose?
With option 1, at least I need to maintain two State tables, let alone the
cost for duplicating stream (I am not sure how expensive this is in term of
resource), and the requirement on duplicating the CoFlatMapFunction (*).
With option 2, there's additional cost coming from unioning,
splitting/selecting, and type-casting at the final streams. 
Is there any better option for me?

Thank you very much for your support.
Regards,
Averell

(*) I am using Scala, and I tried to create a RichCoFlatMapFunction of type
[Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
Function as well.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Averell
Thank you Vino & Xingcan.
@Vino: could you help explain more details on using DBMS? Would that be with
using TableAPI, or you meant directly reading DBMS data inside the
ProcessFunction?

@Xingcan: I don't know what are the benefits of using CoProcess over
RichCoFlatMap in this case.
Regarding using Either wrapper, as my understanding, I would need to use
that both in my sources (stream_A and B) and in the
CoProcess/CoFlatMapFunction. Then using a super class Animal would be more
convenient, wouldn't it?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Averell
Thank you Xingcan.
Regarding that Either, I still see the need to do TypeCasting/CaseClass
matching. Could you please help give a look?
val transformed = dog
    
.union(cat)
   
 .connect(transformer)
     .keyBy(r
=> r.name, r2 => r2.name)
    
.process(new TransformerCoProcessFunction)
     .split(_
match {

  
case Right(d) => List("dog")

  
case Left(c) => List("cat")

  
case _ => List("")
     })
 
 val transformed_dog = transformed.select("dog").map(_ match {

  
case Right(d) => d

  
case _ => NON_EXIST_DOG
     })
 val transformed_cat = transformed.select("cat").map(_ match {

  
case Left(c) => c

  
case _ => NON_EXIST_CAT
     })
Thanks!
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Initial value of ValueStates of primitive types

2018-08-17 Thread Averell
Hi,

In Flink's documents, I couldn't find any example that uses primitive type
when working with States. What would be the initial value of a ValueState of
type Int/Boolean/...? The same question apply for MapValueState like
[String, Int]

Thanks and regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Initial value of ValueStates of primitive types

2018-08-17 Thread Averell
Thank you Dominik.

So there's an implicit conversion, which means that getState().value() would
always give a deteministic result (i.e: Boolean value would always be false,
Int value would always be 0)

I found another funny thing is even with ref type like Integer, there is
also that implicit conversion:

val y:Integer = getRuntimeContext.getState(new
ValueStateDescriptor[Int]("Triggered", classOf[Int])).value()
>> y = {Integer@5795} "0" 

Thanks for your time.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Raising a bug in Flink's unit test scripts

2018-08-24 Thread Averell
Good day everyone,

I'm writing unit test for the bug fix FLINK-9940, and found that in some
existing tests in flink-fs-tests cannot detect the issue when the file
monitoring function emits duplicated files (i.e. a same file is reported
multiple times).
Could I just fix this as part of that FLINK-9940 bug fix, or I have to raise
a separated bug?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can I only use checkpoints instead of savepoints in production?

2018-08-24 Thread Averell
Hi Vino,

Regarding this statement "/Checkpoints are taken automatically and are used
for automatic restarting job in case of a failure/", I do not quite
understand the definition of a failure, and how to simulate that while
testing my application. Possible scenarios that I can think of:
   (1) flink application killed
   (2) cluster crashed
   (3) one of the servers in the cluster crashed
   (4) unhandled exception raised when abnormal data received
   ...

Could you please help explain?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can I only use checkpoints instead of savepoints in production?

2018-08-26 Thread Averell
Thank you Vino.

I sometimes got the error message like the one below. It looks like my
executors got overloaded. Here I have another question: is there any
existing solution that allows me to have the job restored automatically?

Thanks and best regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can I only use checkpoints instead of savepoints in production?

2018-08-27 Thread Averell
Thank you Vino. 

I put the message in a  tag, and I don't know why it was not shown in the
email thread. I paste the error message below in this email.

Anyway, it seems that was an issue with enabling checkpointing. Now I am
able to get it turned on properly, and my job is getting restored
automatically.
I am trying to test my scenarios now. Found some issues, and I think it
would be better to ask in a separate thread.

Thanks and regards,
Averell

=
org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 457d8f370ef8a50bb462946e1f12b80e)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:661)
..
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
with id container_1535279282999_0032_01_13 timed out.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1610)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can I only use checkpoints instead of savepoints in production?

2018-08-27 Thread Averell
Hi Vino,

Could you please tell where I should find the JM and TM logs? I'm running on
an AWS EMR using yarn.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-08-27 Thread Averell
Hello Fabian, and all,

Please excuse me for digging this old thread up.
I have a question regarding sending of the "barrier" messages in Flink's
checkpointing mechanism: I want to know when those barrier messages are sent
when I am using a file source. Where can I find it in the source code?

I'm still with my 20,000 small files issue, when I have all those 2
files appear to the ContinuousFileMonitorfingFunction at the same time.
It is taking only a few seconds to list all those files, but it is expected
to take about 5 minutes have those 20K files processed till my sink.
Due to some resources limitation issue, my job fails after about 3 minutes.
And what is happening after that is the job crashes, gets restored, tries to
process all 20K files from file 1 again, and ultimately fails again after 3
minutes,... It goes into an indefinite loop.

I think that this is the expected behaviour, as my current checkpoint config
is to checkpoint every 10s, and it took only a second or two for the listing
of those 20K files. Am I correct here? And do we have a solution for this?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can I only use checkpoints instead of savepoints in production?

2018-08-27 Thread Averell
Thank you, Vino.
I found it, http://:8088/ 

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-08-28 Thread Averell
Hello Fabian,

Thanks for the answer. However, my question is a little bit different.
Let me rephrase my example and my question:
 * I have 10,000 unsplittable small files to read, which, in total, has
about 10M output lines.
 * From Flink's reporting web GUI, I can see that CFMF and
ContinuousFileReaderOperator (CFRO) are reported separately.
- CFMF needs about 10 seconds to generate all 10,000 records (as
you said, in this case, 1 record = 1 file split).
- CFRO generates about 2M records per minute (which means CFRO
is processing at the rate of 2,000 files per minute)
 * I set the checkpointing interval = 1 minute.
In this example, /will the 1st barrier be injected into the stream of
file-splits 50 seconds after the 10,000th split, or after the 2,000th one?/

Sorry for being confusing.

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Small-files source - partitioning based on prefix of file

2018-08-28 Thread Averell
Thank you Fabian.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


ElasticSearch 6 - error with UpdateRequest

2018-08-30 Thread Averell
Good day everyone,

I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following
error:

Caused by: java.lang.NoSuchMethodError:
org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
at
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76)

Below is my ElasticsearchSinkFunction:

import org.elasticsearch.action.update.UpdateRequest
def upsertRequest(element: T): UpdateRequest = {
new UpdateRequest(
"myIndex",
"record",
s"${element.id}")
.doc(element.toMap())
}
override def process(element: T, runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(upsertRequest(element))
}

What could be the issue here?

Thanks for your help.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ElasticSearch 6 - error with UpdateRequest

2018-08-31 Thread Averell
Hi Timo,

Thanks for your help. I don't get that error anymore after putting that file
into my project.
However, I don't understand how it could help. I have been using the Flink
binary built on my same laptop, then how could it be different between
having that java class in Flink project vs in my project?
If you have some spare time, please help explain.

I also would like to know the other way to fix that issue (that you
implemented in your branch).

Thanks a lot for your help.
Regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Promethus - custom metrics at job level

2018-09-02 Thread Averell
Hi everyone,

I am trying to publish some counters and meters from my Flink job, to be
scraped by a Prometheus server. It seems to me that all the metrics that I
am publishing are done at the task level, so that my Prometheus server needs
to be configured to scrape from many targets (the number equivalent to my
max parallelism). And after that, I need to do aggregation at the Prometheus
server to get the numbers for my whole job.

My question is: is it possible to have metrics at the job level? And can I
have one single Prometheus target to scrape the data from?

I found this
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Service-discovery-for-Prometheus-on-YARN-td21988.html,
which looks like a "no" answer to my question. However, I still hope for
some easy to use solution.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Promethus - custom metrics at job level

2018-09-03 Thread Averell
Thank you Reza. I will try your repo first :)

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Logging metrics from within Elasticsearch ActionRequestFailureHandler

2018-09-12 Thread Averell
Good day everyone,

I'm writing to Elasticsearch, and I need to count the number of records that
the process failed to write. The problem that I'm facing is there is no
RunningContext that I can access from within
o.a.f.s.c.elasticsearch.ActionRequestFailureHandler's onFailure method so
that I can trigger getMetricGroup method.

What can I do in this case?
I see one option in rewriting ElasticsearchSink/ElasticsearchSink.Builder,
to override ElasticsearchSinkBase.open() method. But it looks like too much
boilerplate code - the designer of ElasticsearchSink.Builder must have had
some better solution.

Thanks and best regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Logging metrics from within Elasticsearch ActionRequestFailureHandler

2018-09-13 Thread Averell
Thank you Dawid.
I will open a Jira Wish for this.

Regarding a solution for this, I have another question regarding the current
implementation of ElasticsearchSink classes. Why do we use the Builder
mechanism? Would that make it more inconvenient to solve issues like the one
I'm having?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Utilising EMR's master node

2018-09-16 Thread Averell
Hello everyone,

I'm trying to run Flink on AWS EMR following the guides from  Flink doc
<https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn>
  
and from  AWS
<https://docs.aws.amazon.com/emr/latest/ReleaseGuide/flink-configure.html> 
, and it looks like the EMR master is never used, neither for JM nor TM.
"bin/yarn-session.sh -q" only shows the core nodes. We are only running
Flink on that EMR, so it is wasting of resources.

So, is there any way to use the master node for the job, at least for the JM
only?

If that is not possible, should I have different hardware configurations
between the master node and core nodes (smaller server for the master)? 

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Utilising EMR's master node

2018-09-17 Thread Averell
Thank you Gary.

Regarding the option to use a smaller server for the master node, when
starting a flink job, I would get an error like the following;

/Caused by: org.apache.flink.configuration.IllegalConfigurationException:
*The number of virtual cores per node were configured with 16 but Yarn only
has 4 virtual cores available*. Please note that the number of virtual cores
is set to the number of task slots by default unless configured in the Flink
config with 'yarn.containers.vcores.'/

To get around that error, I need to start the job from one of the core node.
Should that be an expected behaviour?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Utilising EMR's master node

2018-09-19 Thread Averell
Hi Gary,
Thanks for your help.

Regarding TM configurations, in term of performance, when my 2 servers have
16 vcores each, should I have 2 TMs with 16GB mem, 16 task slots each, or 8
TMs with 4GB mem and 4 task slots each?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Strange behaviour with checkpointing and custom FilePathFilter

2018-09-20 Thread Averell
Good day everyone,

I have about 100 thousand files to read, and a custom FilePathFilter with a
simple filterPath method defined as below (the custom part is only to check
file-size and skip files with size = 0) 
override def filterPath(filePath: Path): Boolean = {
filePath == null ||
filePath.getName.startsWith(".") ||
filePath.getName.startsWith("_") ||

filePath.getName.contains(FilePathFilter.HADOOP_COPYING) ||
{
def fileStatus = 
filePath.getFileSystem.getFileStatus(filePath)
!fileStatus.isDir && 
fileStatus.getLen == 0
}
}

It is running fine either when I disable checkpointing or when I use the
default FilePathFilter. It takes about 7 minutes to finished processing all
files (from source to sink).
However, when I have both, customer filter and checkpointing, it usually
takes 15-20 minutes for Flink to start reading my files (in Flink GUI, the
CustomFileSource monitor generates 0 records during that 15-20 minutes
period)

Could someone please help with this? 

Thank you very much for your time.
Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Strange behaviour with checkpointing and custom FilePathFilter

2018-09-20 Thread Averell
Hi Vino,

I am using a custom FileInputFormat, but the mentioned problem only comes
when I try a custom FilePathFilter. 

My whole file for that custom FilePathFilter is quoted below.

Regarding enabling DEBUG, which classes/packages should I turn DEBUG on? as
I am afraid that turning DEBUG on at the global level would be too heavy.

Thanks and regards,
Averell


==
import java.util.Date

import org.apache.flink.api.common.io.FilePathFilter
import org.apache.flink.core.fs.Path
import org.slf4j.LoggerFactory

object SdcFilePathFilter {
private val TIME_FORMAT = new java.text.SimpleDateFormat("MMdd 
hhmm")
private val LOG = LoggerFactory.getLogger(classOf[SdcFilePathFilter])
}

class SdcFilePathFilter(lookBackPeriod: Long, homePath: Path) extends
FilePathFilter {
private val homeDepth = homePath.depth()

override def filterPath(filePath: Path): Boolean = {
filePath == null ||
filePath.getName.startsWith(".") ||
filePath.getName.startsWith("_") ||

filePath.getName.contains(FilePathFilter.HADOOP_COPYING) ||

!(filePath.getName.endsWith(".tar.gz") ||

filePath.getName.matches("""^\d{8}$""") ||

(filePath.getName.matches("""^\d{4}$""") &&

SdcFilePathFilter.TIME_FORMAT.parse(s"${filePath.getParent.getName}
${filePath.getName}").getTime <

new Date().getTime - lookBackPeriod))
}
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Strange behaviour with checkpointing and custom FilePathFilter

2018-09-20 Thread Averell
Please refer to this version:
===

import java.util.Date

import org.apache.flink.api.common.io.FilePathFilter
import org.apache.flink.core.fs.Path
import org.slf4j.LoggerFactory

object SdcFilePathFilter {
private val TIME_FORMAT = new java.text.SimpleDateFormat("MMdd 
hhmm")
private val LOG = LoggerFactory.getLogger(classOf[SdcFilePathFilter])
}

class SdcFilePathFilter(lookBackPeriod: Long) extends FilePathFilter {
override def filterPath(filePath: Path): Boolean = {
filePath == null ||
filePath.getName.startsWith(".") ||
filePath.getName.startsWith("_") ||

filePath.getName.contains(FilePathFilter.HADOOP_COPYING) ||

!(filePath.getName.endsWith(".tar.gz") ||

filePath.getName.matches("""^\d{8}$""") ||

(filePath.getName.matches("""^\d{4}$""") && {

try {

SdcFilePathFilter.TIME_FORMAT.parse(s"${filePath.getParent.getName}
${filePath.getName}").getTime <

new Date().getTime - lookBackPeriod

} catch {

case e: Throwable =>

SdcFilePathFilter.LOG.warn("Unknown exception happens while
checking folder eligibility: {}", e.getStackTrace)

true

}
}))
}
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Strange behaviour with checkpointing and custom FilePathFilter

2018-09-23 Thread Averell
Hi Vino, and all,

I tried to avoid the step to get File Status, and found that the problem is
not there any more. I guess doing that with every single file out of 100K+
files on S3 caused some issue with checkpointing.
Still trying to find the cause, but with lower priority now.

Thanks for your help.

Regards,
Averell   



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Strange behaviour with checkpointing and custom FilePathFilter

2018-09-24 Thread Averell
Hi Kostas,

Yes, applying the filter on the 100K files takes time, and the delay of 15
minutes I observed definitely caused by that big number of files and the
cost of each individual file status check. However, the delay is much
smaller when checkpointing is off.
Within that 15 minutes, checkpointing process is not triggered though.

Thanks and regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Scheduling sources

2018-09-24 Thread Averell
Hi everyone,

I have 2 file sources, which I want to start reading them in a specified
order (e.g: source2 should only start 5 minutes after source1 has started).
I could not find any Flink document mentioning this capability, and I also
tried to search the mailing list, without any success.
However, on Flink GUI there's a Timeline tab which shows start-time of each
operator. And this gives me a hope that there is something that can help
with my requirement.
(http://localhost:20888/proxy/application_1537700592704_0026/#/jobs/0360094da093e36299273329f9dec19d/timeline)

Could you please help give some help?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Strange behaviour with checkpointing and custom FilePathFilter

2018-09-24 Thread Averell
Hi Kostas,

I use PROCESS_CONTINUOUSLY mode, and checkpoint interval of 20 minutes. When
I said "Within that 15 minutes, checkpointing process is not triggered
though" in my previous email, I was not complaining that checkpoint is not
running, but to say that the slowness is not due to ongoing checkpoint.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Strange behaviour with checkpointing and custom FilePathFilter

2018-09-25 Thread Averell
Thank you Kostas for spending time on my case.

Relating to the issue I mentioned, I have another issue caused by having a
lot of files to list. From the error msg, I understand that the listing was
taking more than 30s, and the JM thought that it hung and killed it. Is that
possible to increase this 30s timer?

Thanks and regards,
Averell


2018-09-25 12:01:13.222 [Canceler/Interrupts for Source: Custom File Source
(1/1) (a5f5434070044510eafc9103bc24af43).] WARN 
org.apache.flink.runtime.taskmanager.Task  - Task 'Source: Custom File
Source (1/1)' did not react to cancelling signal for 30 seconds, but is
stuck in method:
 java.net.URI$Parser.scan(URI.java:2998)
java.net.URI$Parser.parseAuthority(URI.java:3138)
java.net.URI$Parser.parseHierarchical(URI.java:3097)
java.net.URI$Parser.parse(URI.java:3053)
java.net.URI.(URI.java:746)
org.apache.hadoop.fs.Path.makeQualified(Path.java:467)
org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:464)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem$$Lambda$63/515305348.apply(Unknown
Source)
com.amazon.ws.emr.hadoop.fs.s3n.BasicFileStatusFactory.newFile(BasicFileStatusFactory.java:69)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.newFile(S3NativeFileSystem.java:1154)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:962)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:914)
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.listStatus(EmrFileSystem.java:364)
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:97)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:395)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:416)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:416)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:327)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:292)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Scheduling sources

2018-09-25 Thread Averell
Thank you Till.

My use case is like this: I  have two streams, one is raw data (1), the
other is enrichment data (2), which in turn consists of two component:
initial enrichment data (2a) which comes from an RDBMS table, and
incremental data (2b) which comes from a Kafka stream. To ensure that (1)
gets enriched properly, I want to have (2a) loaded properly into memory
before starting to process (1). 

Is there any walkaround solution for me in this case?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Utilising EMR's master node

2018-09-26 Thread Averell
Thank you Gary.
Regarding your previous suggestion to to change the configuration regarding
to the number of vcores on the EMR master node, I tried and found one
funny/bad behaviour as following:
 * hardware onfiguration: master node: 4vcores + 8GB ram, 2x executors with
16vcores + 32GB ram each.
 * Flink launch parameters: -yn 2 -ys 16 -ytm 4g...
4 TMs were created, with 2 of them were used (0 free slots) and two others
not used (16 free slots). The bad thing is most of the time 2 free TMs are
on a same machine, and two occupied ones are on the other machine.
If I dont change the Hadoop configurations then still 4 TMs created, but the
occupied ones are always on two different servers.

I'm not sure whether that's EMR's issue, or YARN's or Flink's.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Scheduling sources

2018-09-26 Thread Averell
Hi Kostas,

So that means my 2a will be broadcasted to all TMs? Is that possible to
partition that? As I'm using CoProcessFunction to join 1 with 2.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Scheduling sources

2018-09-26 Thread Averell
Hi Tison,

"/setting a timer on schedule start and trigger (1) after a fixed delay/"
would be quite sufficient for me.
Looking forward to the change of that Jira ticket's status.

Thanks for your help.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Metrics: (1) Histogram in prometheus and (2) meter by event_time

2018-09-26 Thread Averell
Good day everyone,

I have a stream with two timestamps (ts1 and ts2) inside each record. My
event time is ts1. This ts1 has value truncated to a quarter (like 23:30,
23:45, 00:00,...) 
I want to report two metrics:
 1. A meter which counts number of records per value of ts1. (fig.1)
 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Meter.png>
 

 2. A histogram which shows the distribution of the difference between ts1
and ts2 within each record (fig.2)
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/heatmap_histogram.png>
 

I'm using Prometheus with Grafana. Is that possible to do what I mentioned?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Metrics: (1) Histogram in prometheus and (2) meter by event_time

2018-09-27 Thread Averell
Hi Kostas,

Yes, I want them as metrics, as they are purely for monitoring purpose.
There's no need of fault tolerance.

If I use side-output, for example for that metric no.1, I would need a
tumbling AllWindowFunction, which, as I understand, would introduce some
delay to both the normal processing flow, and to the checkpoint process. 

I already tried to follow the referencing web page that you sent. However, I
could not know how to have what I want.
For example, with metrics no.1 - meter: org.apache.flink.metrics.Meter only
provides markEvent(), which marks an event to that Meter. There is no option
to provide the event_time, and processing_time is always used. So my graph
is spread over time like the one below.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Meter2.png>
 

For metrics no.2 - histogram: What I can see at Prometheus is the calculated
percentile values (0.5, 0.75, 0.9, 0.99, 0.999), which tells me, for
example: 99% the total number of records had ts1-ts2 <= 350s (which looks
more like a rolling average). But it doesn't tell me roughly how many % of
record have diff of 250ms, how many of 260ms, etc...
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Histo2.png>
 

Thanks and regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Metrics: (1) Histogram in prometheus and (2) meter by event_time

2018-09-28 Thread Averell
Hello Kostas,

Thank you very much for the details. Also thanks for that "feel free to
disagree" (however, I don't have any desire to disagree here :) thanks a
lot)

Regarding that mainStream.windowAll, did you mean that checkpointing of the
two branches (the main one and the monitoring one) will be separated? Is
that possible to disable checkpointing for that 2nd branch?

Thanks and best regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Difference between BucketingSink and StreamingFileSink

2018-10-04 Thread Averell
Hi everyone,

I am trying to persist my stream into parquet files. In the documents, I can
see two different file sinks: BucketingSink (Rolling File Sink) and
StreamingFileSink. I could not see any information regarding the differences
between these two types.
Which one should I choose for writing to parquet? Is that possible to
partition my output basing on event-time?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Difference between BucketingSink and StreamingFileSink

2018-10-04 Thread Averell
Hi,

https://issues.apache.org/jira/browse/FLINK-9749 <<< as per this ticket,
StreamingFileSink is a newer option, which is better than BucketingSink for
Parquet.
Would love to see some example one using that.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Difference between BucketingSink and StreamingFileSink

2018-10-04 Thread Averell
Hi,

Sorry for wasting your time. I found the solution for that question
regarding event-time: a class that extends BucketAssigner would do the
needful:

class SdcTimeBucketAssigner[T <: MyClass](prefix: String, formatString:
String) extends BucketAssigner[T, String]{
@transient
var dateFormatter = new SimpleDateFormat(formatString)

override def getBucketId(in: T, context: BucketAssigner.Context): 
String =
{
if (dateFormatter == null) dateFormatter = new
SimpleDateFormat(formatString)
s"$prefix${dateFormatter.format(new 
java.util.Date(in.getTimestamp))}"
}

override def getSerializer = SimpleVersionedStringSerializer.INSTANCE
}

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Identifying missing events in keyed streams

2018-10-04 Thread Averell
Hi everyone, 

I have a keyed stream which is expecting events every fixed interval (let's
say 1 minute). I want to raise alarms for any key which has received no
events in n-periods. What should be the cheapest way (in term of performance
) to do this?
I thought of some solutions, but don't know which one is the best:
1. Sliding window then count the number of events in each window <<< this
seems quite expensive when n is big.
2. Register a timer for every single event, record the last event timestamp
and check that timestamp when the timer expires. (This would be the best if
there's an option to cancel/modify a timer, but it seems that feature is not
available yet)
3. Session window: i haven't implemented this to verify its feasibility.
Thinking of firing the alarm on every window clear event.
4. CEP. I don't know whether it's possible or not. Haven't found a guide for
defining patterns of missing events.

Could you please give some advices?

Thanks and best regards, 
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

2018-10-04 Thread Averell
Hi Fabian, Kostas,

>From the description of this ticket
https://issues.apache.org/jira/browse/FLINK-9753, I understand that now my
output parquet file with StreamingFileSink will span multiple checkpoints.
However, when I tried (as in the here below code snippet) I still see that
one "part-X-X" file is created after each checkpoint. Is there any other
configuration that I'm missing?

BTW, I have another question regarding
StreamingFileSink.BulkFormatBuilder.withBucketCheckInterval(). As per the
notes at the end of this page  StreamingFileSink
<https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html>
 
, buck-enconding can only combined with OnCheckpointRollingPolicy, which
rolls on every checkpoint. So setting that CheckInterval makes no
difference. So why should we expose that withBucketCheckInterval method?

Thanks and best regards,
Averell

def buildSink[T <: MyBaseRecord](outputPath: String)(implicit ct:
ClassTag[T]): StreamingFileSink[T] = {
StreamingFileSink.forBulkFormat(new Path(outputPath),
ParquetAvroWriters.forReflectRecord(ct.runtimeClass)).asInstanceOf[StreamingFileSink.BulkFormatBuilder[T,
String]]
.withBucketCheckInterval(5L * 60L * 1000L)
.withBucketAssigner(new 
DateTimeBucketAssigner[T]("-MM-dd--HH"))
.build()
}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

2018-10-05 Thread Averell
Hi Kostas,

Thanks for the info.
Just one more question regarding writing parquet. I need to write my stream
as parquet to S3. As per this ticket 
https://issues.apache.org/jira/browse/FLINK-9752
<https://issues.apache.org/jira/browse/FLINK-9752>  , it is now not
supported. Is there any ready-to-use solution that supports copying/moving
file from HDFS to S3 (something like a trigger from Flink after it has
finished writing to HDFS).

Thanks and best regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

2018-10-05 Thread Averell
What a great news.
Thanks for that, Kostas.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

2018-10-05 Thread Averell
Hi Kostas,

I tried your PR - trying to write to S3 from Flink running on AWS, and I got
the following error. I copied the three jar files
flink-hadoop-fs-1.7-SNAPSHOT.jar, flink-s3-fs-base-1.7-SNAPSHOT.jar,
flink-s3-fs-hadoop-1.7-SNAPSHOT.jar to lib/ directory. Do I need to make any
change to HADOOP configurations?

Thanks and best regards,
Averell

java.lang.Exception: unable to establish the security context
at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1118)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2503)
at org.apache.hadoop.security.Groups.(Groups.java:106)
at org.apache.hadoop.security.Groups.(Groups.java:101)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:448)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:331)
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:359)
at
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
... 1 more
Caused by: java.lang.RuntimeException: class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2497)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


"unable to establish the security context" with shaded Hadoop S3

2018-10-05 Thread Averell
Hi everyone,

I'm trying a built after this  PR 6795
<https://github.com/apache/flink/pull/6795>   for S3 Recoverable writer, to
write my stream into parquet file on S3 with Flink running on AWS EMR; and
get the error "unable to establish the security context" with full
stacktrace below.
The shading of hadoop jars started from this ticket  FLINK-10366
<https://issues.apache.org/jira/browse/FLINK-10366>  . Googling the error
didn't help. Could someone please help me? 

Thanks and best regards,
Averell

/Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
set.
java.lang.Exception: *unable to establish the security context*
at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class
*org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider*
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2503)
at org.apache.hadoop.security.Groups.(Groups.java:106)
at org.apache.hadoop.security.Groups.(Groups.java:101)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:448)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:331)
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:359)
at
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
... 1 more
Caused by: java.lang.RuntimeException: class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2497)
... 8 more
/




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Utilising EMR's master node

2018-10-06 Thread Averell
Hi Gary,

Thanks for the information. I didn't know that -yn is obsolete :( I am using
Flink 1.6.
Not sure whether that's a bug when I tried to set -yn explicitly, but I
started only 1 cluster.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


  1   2   >