Re: OOM error for heap state backend.
Thanks Andrey, My question is related to The FsStateBackend is encouraged for: - Jobs with large state, long windows, large key/value states. - All high-availability setups. How large is large state without any overhead added by the framework? Best, Vishwas On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin wrote: > Hi Vishwas, > > is this quantifiable with respect to JVM heap size on a single node >> without the node being used for other tasks ? > > > I don't quite understand this question. I believe the recommendation in > docs has the same reason: use larger state objects so that the Java object > overhead pays off. > RocksDB keeps state in memory and on disk in the serialized form. > Therefore it usually has a smaller footprint. > Other jobs in the same task manager can potentially use other state > backend depending on their state requirements. > All tasks in the same task manager share the JVM heap as the task manager > runs one JVM system process on the machine where it is deployed to. > > Best, > Andrey > > On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara > wrote: > >> Hi Andrey, >> Thanks for getting back to me so quickly. The screenshots are for 1GB >> heap, the keys for the state are 20 character strings(20 bytes, we don't >> have multi byte characters) . So the overhead seems to be quite large(4x) >> even in comparison to the checkpoint size(which already adds an overhead) . >> In this document [1] it says use FS/Heap backend for large states, is this >> quantifiable with respect to JVM heap size on a single node without the >> node being used for other tasks ? >> I have attached GC log for TM and JM >> >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend >> >> Best, >> Vishwas >> >> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin >> wrote: >> >>> Hi Vishwas, >>> >>> I believe the screenshots are from a heap size of 1GB? >>> >>> There are indeed many internal Flink state objects. They are overhead >>> which is required for Flink to organise and track the state on-heap. >>> Depending on the actual size of your state objects, the overhead may be >>> relatively large or compared to the actual state size. >>> For example, if you just keep integers in your state then overhead is >>> probably a couple of times larger. >>> It is not easy to estimate exactly on-heap size without through analysis. >>> >>> The checkpoint has little overhead and includes only actual state data - >>> your serialized state objects which are probably smaller than their heap >>> representation. >>> >>> So my guess is that the heap representation of the state is much bigger >>> compared to the checkpoint size. >>> >>> I also cc other people who might add more thoughts about on-heap state >>> size. >>> >>> You could also provide GC logs as Xintong suggested. >>> >>> Best, >>> Andrey >>> >>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara >>> wrote: >>> >>>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI( >>>> checkpoint size). I took a heap dump and I could not find any memory leak >>>> from user code. I see the similar behaviour on smaller heap size, on a 1GB >>>> heap , the state size from checkpoint UI is 180 MB. Attaching some >>>> screenshots of heap profiles if it helps. So when the state grows GC takes >>>> a long time and sometimes the job manager removes TM slot because of >>>> 1ms timeout and tries to restore the task in another task manager, this >>>> creates a cascading effect and affects other jobs running on the cluster. >>>> My tests were run in a single node cluster with 1 TM and 4 task slots with >>>> a parallelism of 4. >>>> >>>> Best, >>>> Vishwas >>>> >>>> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin >>>> wrote: >>>> >>>>> Hi Vishwas, >>>>> >>>>> If you use Flink 1.7, check the older memory model docs [1] because >>>>> you referred to the new memory model of Flink 1.10 in your reference 2. >>>>> Could you also share a screenshot where you get the state size of 2.5 >>>>> GB? Do you mean Flink WebUI? >>>>> Generally, it is quite hard to estimate the on-heap size of state java >>>>> objects. I never heard about such a Flink metric.
OOM error for heap state backend.
Hi guys, I use flink version 1.7.2 I have a stateful streaming job which uses a keyed process function. I use heap state backend. Although I set TM heap size to 16 GB, I get OOM error when the state size is around 2.5 GB(from dashboard I get the state size). I have set taskmanager.memory.fraction: 0.01 (which I believe is for native calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors start showing up when the state size reaches 1 GB. This I find puzzling because I would expect to get a lot more space on the heap for state when I change the size to 16 GB, what fraction of the heap is used by the framework ?[2]. Below is the stack trace for the exception. How can I increase my state size on the heap ? 2020-08-21 02:05:54,443 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB (used/committed/max)] 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory: 1074692521 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace: 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB (used/committed/max)] 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265] 2020-08-21 02:05:54,446 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4) switched from RUNNING to FAILED. java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance(Array.java:75) at java.util.Arrays.copyOf(Arrays.java:3212) at java.util.Arrays.copyOf(Arrays.java:3181) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown Source) at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview Best, Vishwas
Re: No space left on device exception
Thank you Chesnay. Yes but I could change the staging directory by adding -Djava.io.tmpdir=/data/flink-1.7.2/tmp to *env.java.opts *in the flink-conf.yaml file. Do you see any problem with that? Best, Vishwas On Thu, Aug 20, 2020 at 2:01 PM Chesnay Schepler wrote: > Could you try adding this to your flink-conf.yaml? > > s3.staging-directory: /usr/mware/flink/tmp > > On 20/08/2020 20:50, Vishwas Siravara wrote: > > Hi Piotr, > I did some analysis and realised that the temp files for s3 > checkpoints are staged in /tmp although the *io.tmp.dirs *is set to a > different directory. > > ls -lrth > > drwxr-xr-x. 2 was was 32 Aug 20 17:52 hsperfdata_was > -rw---. 1 was was 505M Aug 20 18:45 > presto-s3-8158855975833379228.tmp > -rw---. 1 was was 505M Aug 20 18:45 > presto-s3-7048419193714606532.tmp > drwxr-xr--. 2 root root 6 Aug 20 18:46 hsperfdata_root > [was@sl73rspapd031 tmp]$ > > flink-conf.yaml configuration > > io.tmp.dirs: /usr/mware/flink/tmp > > The /tmp has only 2GB, is it possible to change the staging directory for s3 > checkpoints ? > > Best, > > Vishwas > > > On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara > wrote: > >> Hi Piotr, >> Thank you for your suggestion. I will try that, are the temporary files >> created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ? >> Would these files be the same size as checkpoints ? >> >> >> Thanks, >> Vishwas >> >> On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski >> wrote: >> >>> Hi, >>> >>> As far as I know when uploading a file to S3, the writer needs to first >>> create some temporary files on the local disks. I would suggest to double >>> check all of the partitions on the local machine and monitor available disk >>> space continuously while the job is running. If you are just checking the >>> free space manually, you can easily miss a point of time when you those >>> temporary files are too big and approaching the available disk space usage, >>> as I'm pretty sure those temporary files are cleaned up immediately after >>> throwing this exception that you see. >>> >>> Piotrek >>> >>> czw., 20 sie 2020 o 00:56 Vishwas Siravara >>> napisał(a): >>> >>>> Hi guys, >>>> I have a deduplication job that runs on flink 1.7, that has some state >>>> which uses FsState backend. My TM heap size is 16 GB. I see the below error >>>> while trying to checkpoint a state of size 2GB. There is enough space >>>> available in s3, I tried to upload larger files and they were all >>>> successful. There is also enough disk space in the local file system, the >>>> disk utility tool does not show anything suspicious. Whenever I try to >>>> start my job from the last successful checkpoint , it runs into the same >>>> error. Can someone tell me what is the cause of this issue? Many thanks. >>>> >>>> >>>> Note: This error goes away when I delete io.tmp.dirs and restart the >>>> job from last checkpoint , but the disk utility tool does not show much >>>> usage before deletion, so I am not able to figure out what the problem is. >>>> >>>> 2020-08-19 21:12:01,909 WARN >>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could >>>> not close the state stream for s3p://featuretoolkit.c >>>> heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353. >>>> 1363 java.io.IOException: No space left on device >>>> 1364 at java.io.FileOutputStream.writeBytes(Native Method) >>>> 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326) >>>> 1366 at >>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) >>>> 1367 at >>>> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) >>>> 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) >>>> 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158) >>>> 1370 at >>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986) >>>> 1371 at >>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) >>>> 1372 at >>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.
Re: No space left on device exception
Hi Piotr, I did some analysis and realised that the temp files for s3 checkpoints are staged in /tmp although the *io.tmp.dirs *is set to a different directory. ls -lrth drwxr-xr-x. 2 was was 32 Aug 20 17:52 hsperfdata_was -rw---. 1 was was 505M Aug 20 18:45 presto-s3-8158855975833379228.tmp -rw---. 1 was was 505M Aug 20 18:45 presto-s3-7048419193714606532.tmp drwxr-xr--. 2 root root 6 Aug 20 18:46 hsperfdata_root [was@sl73rspapd031 tmp]$ flink-conf.yaml configuration io.tmp.dirs: /usr/mware/flink/tmp The /tmp has only 2GB, is it possible to change the staging directory for s3 checkpoints ? Best, Vishwas On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara wrote: > Hi Piotr, > Thank you for your suggestion. I will try that, are the temporary files > created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ? > Would these files be the same size as checkpoints ? > > > Thanks, > Vishwas > > On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski > wrote: > >> Hi, >> >> As far as I know when uploading a file to S3, the writer needs to first >> create some temporary files on the local disks. I would suggest to double >> check all of the partitions on the local machine and monitor available disk >> space continuously while the job is running. If you are just checking the >> free space manually, you can easily miss a point of time when you those >> temporary files are too big and approaching the available disk space usage, >> as I'm pretty sure those temporary files are cleaned up immediately after >> throwing this exception that you see. >> >> Piotrek >> >> czw., 20 sie 2020 o 00:56 Vishwas Siravara >> napisał(a): >> >>> Hi guys, >>> I have a deduplication job that runs on flink 1.7, that has some state >>> which uses FsState backend. My TM heap size is 16 GB. I see the below error >>> while trying to checkpoint a state of size 2GB. There is enough space >>> available in s3, I tried to upload larger files and they were all >>> successful. There is also enough disk space in the local file system, the >>> disk utility tool does not show anything suspicious. Whenever I try to >>> start my job from the last successful checkpoint , it runs into the same >>> error. Can someone tell me what is the cause of this issue? Many thanks. >>> >>> >>> Note: This error goes away when I delete io.tmp.dirs and restart the >>> job from last checkpoint , but the disk utility tool does not show much >>> usage before deletion, so I am not able to figure out what the problem is. >>> >>> 2020-08-19 21:12:01,909 WARN >>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could >>> not close the state stream for s3p://featuretoolkit.c >>> heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353. >>> 1363 java.io.IOException: No space left on device >>> 1364 at java.io.FileOutputStream.writeBytes(Native Method) >>> 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326) >>> 1366 at >>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) >>> 1367 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) >>> 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) >>> 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158) >>> 1370 at >>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986) >>> 1371 at >>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) >>> 1372 at >>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) >>> 1373 at >>> org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) >>> 1374 at >>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) >>> 1375 at >>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269) >>> 1376 at >>> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58) >>> 1377 at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) >>> 1378 at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) >>> 1379 at >>> org.apache.fli
Re: No space left on device exception
Hi Piotr, Thank you for your suggestion. I will try that, are the temporary files created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ? Would these files be the same size as checkpoints ? Thanks, Vishwas On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski wrote: > Hi, > > As far as I know when uploading a file to S3, the writer needs to first > create some temporary files on the local disks. I would suggest to double > check all of the partitions on the local machine and monitor available disk > space continuously while the job is running. If you are just checking the > free space manually, you can easily miss a point of time when you those > temporary files are too big and approaching the available disk space usage, > as I'm pretty sure those temporary files are cleaned up immediately after > throwing this exception that you see. > > Piotrek > > czw., 20 sie 2020 o 00:56 Vishwas Siravara > napisał(a): > >> Hi guys, >> I have a deduplication job that runs on flink 1.7, that has some state >> which uses FsState backend. My TM heap size is 16 GB. I see the below error >> while trying to checkpoint a state of size 2GB. There is enough space >> available in s3, I tried to upload larger files and they were all >> successful. There is also enough disk space in the local file system, the >> disk utility tool does not show anything suspicious. Whenever I try to >> start my job from the last successful checkpoint , it runs into the same >> error. Can someone tell me what is the cause of this issue? Many thanks. >> >> >> Note: This error goes away when I delete io.tmp.dirs and restart the job >> from last checkpoint , but the disk utility tool does not show much usage >> before deletion, so I am not able to figure out what the problem is. >> >> 2020-08-19 21:12:01,909 WARN >> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could >> not close the state stream for s3p://featuretoolkit.c >> heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353. >> 1363 java.io.IOException: No space left on device >> 1364 at java.io.FileOutputStream.writeBytes(Native Method) >> 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326) >> 1366 at >> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) >> 1367 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) >> 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) >> 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158) >> 1370 at >> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986) >> 1371 at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) >> 1372 at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) >> 1373 at >> org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) >> 1374 at >> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) >> 1375 at >> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269) >> 1376 at >> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58) >> 1377 at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) >> 1378 at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) >> 1379 at >> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) >> 1380 at >> org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:185) >> 1381 at >> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:84) >> 1382 at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> 1383 at >> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) >> 1384 at >> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) >> 1385 at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) >> 1386 at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> 1387 at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> 1388 at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> 1389 at
No space left on device exception
Hi guys, I have a deduplication job that runs on flink 1.7, that has some state which uses FsState backend. My TM heap size is 16 GB. I see the below error while trying to checkpoint a state of size 2GB. There is enough space available in s3, I tried to upload larger files and they were all successful. There is also enough disk space in the local file system, the disk utility tool does not show anything suspicious. Whenever I try to start my job from the last successful checkpoint , it runs into the same error. Can someone tell me what is the cause of this issue? Many thanks. Note: This error goes away when I delete io.tmp.dirs and restart the job from last checkpoint , but the disk utility tool does not show much usage before deletion, so I am not able to figure out what the problem is. 2020-08-19 21:12:01,909 WARN org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could not close the state stream for s3p://featuretoolkit.c heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353. 1363 java.io.IOException: No space left on device 1364 at java.io.FileOutputStream.writeBytes(Native Method) 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326) 1366 at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 1367 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158) 1370 at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986) 1371 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) 1372 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) 1373 at org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) 1374 at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) 1375 at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269) 1376 at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58) 1377 at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) 1378 at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) 1379 at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) 1380 at org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:185) 1381 at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:84) 1382 at java.util.concurrent.FutureTask.run(FutureTask.java:266) 1383 at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) 1384 at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) 1385 at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) 1386 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 1387 at java.util.concurrent.FutureTask.run(FutureTask.java:266) 1388 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 1389 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 1390 at java.lang.Thread.run(Thread.java:748) 1391 Suppressed: java.io.IOException: No space left on device 1392 at java.io.FileOutputStream.writeBytes(Native Method) 1393 at java.io.FileOutputStream.write(FileOutputStream.java:326) 1394 at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 1395 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) 1396 at java.io.FilterOutputStream.close(FilterOutputStream.java:158) 1397 at java.io.FilterOutputStream.close(FilterOutputStream.java:159) 1398 ... 21 more Thanks, Vishwas
Re: Non parallel file sources
Thanks that makes sense. On Tue, Jun 23, 2020 at 2:13 PM Laurent Exsteens < laurent.exste...@euranova.eu> wrote: > Hi Nick, > > On a project I worked on, we simply made the file accessible on a shared > NFS drive. > Our source was custom, and we forced it to parallelism 1 inside the job, > so the file wouldn't be read multiple times. The rest of the job was > distributed. > This was also on a standalone cluster. On a resource managed cluster I > guess the resource manager could take care of copying the file for us. > > Hope this can help. If there would have been a better solution, I'm also > happy to hear it :). > > Regards, > > Laurent. > > > On Tue, Jun 23, 2020, 20:51 Nick Bendtner wrote: > >> Hi guys, >> What is the best way to process a file from a unix file system since >> there is no guarantee as to which task manager will be assigned to process >> the file. We run flink in standalone mode. We currently follow the brute >> force way in which we copy the file to every task manager, is there a >> better way to do this ? >> >> >> Best, >> Nick. >> > > ♻ Be green, keep it on the screen
Re: Providing hdfs name node IP for streaming file sink
Thanks Yang. Going with setting the HADOOP_CONF_DIR in the flink application. It integrates neatly with flink. Best, Nick. On Mon, Mar 2, 2020 at 7:42 PM Yang Wang wrote: > It may work. However, you need to set your own retry policy(similar as > `ConfiguredFailoverProxyProvider` in hadoop). > Also if you directly use namenode address and do not load HDFS > configuration, some HDFS client configuration (e.g. > dfs.client.*) will not take effect. > > > Best, > Yang > > Nick Bendtner 于2020年3月2日周一 下午11:58写道: > >> Thanks a lot Yang. What are your thoughts on catching the exception when >> a name node is down and retrying with the secondary name node ? >> >> Best, >> Nick. >> >> On Sun, Mar 1, 2020 at 9:05 PM Yang Wang wrote: >> >>> Hi Nick, >>> >>> Certainly you could directly use "namenode:port" as the schema of you >>> HDFS path. >>> Then the hadoop configs(e.g. core-site.xml, hdfs-site.xml) will not be >>> necessary. >>> However, that also means you could benefit from the HDFS >>> high-availability[1]. >>> >>> If your HDFS cluster is HA configured, i strongly suggest you to set the >>> "HADOOP_CONF_DIR" >>> for your Flink application. Both the client and cluster(JM/TM) side need >>> to be set. Then >>> your HDFS path could be specified like this "hdfs://myhdfs/flink/test". >>> Given that "myhdfs" >>> is the name service configured in hdfs-site.xml. >>> >>> >>> Best, >>> Yang >>> >>> >>> >>> [1]. >>> http://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html >>> >>> Nick Bendtner 于2020年2月29日周六 上午6:00写道: >>> To add to this question, do I need to setup env.hadoop.conf.dir to point to the hadoop config for instance env.hadoop.conf.dir=/etc/hadoop/ for the jvm ? Or is it possible to write to hdfs without any external hadoop config like core-site.xml, hdfs-site.xml ? Best, Nick. On Fri, Feb 28, 2020 at 12:56 PM Nick Bendtner wrote: > Hi guys, > I am trying to write to hdfs from streaming file sink. Where should I > provide the IP address of the name node ? Can I provide it as a part of > the > flink-config.yaml file or should I provide it like this : > > final StreamingFileSink sink = StreamingFileSink > .forBulkFormat(hdfs://namenode:8020/flink/test, > ParquetAvroWriters.forGenericRecord(schema)) > > .build(); > > > Best, > Nick > > >
Re: Exactly once semantics for hdfs sink
Hi Khachatryan, Thanks for your reply. Can you help me understand how it works with hdfs specifically , even a link to a document will help. Best, Vishwas On Mon, Feb 10, 2020 at 10:32 AM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hi Vishwas, > > Yes, Streaming File Sink does support exactly-once semantics and can be > used with HDFS. > > Regards, > Roman > > > On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara > wrote: > >> Hi all, >> I want to use the StreamingFile sink for writing data to hdfs. Can I >> achieve exactly once semantics with this sink ? >> >> >> Best, >> HW. >> >
Exactly once semantics for hdfs sink
Hi all, I want to use the StreamingFile sink for writing data to hdfs. Can I achieve exactly once semantics with this sink ? Best, HW.
Unit testing filter function in flink
Hi guys, I want to test a function like : private[flink] def filterStream(dataStream: DataStream[GenericRecord]): DataStream[GenericRecord] = { dataStream.filter(new FilterFunction[GenericRecord] { override def filter(value: GenericRecord): Boolean = { if (value == null || value.get(StipFields.requestMessageType) == null) { return false; } else { ExecutionEnv.messageTypeList.contains(value.get(StipFields.requestMessageType) .toString) && ExecutionEnv.pcrList.contains(value.get(StipFields.pcrList).toString) && (value.get(StipFields .rejectCode).asInstanceOf[Int] == 0) && !(value.get(StipFields.processingCode).toString.equals("33")) } } }) } How can I do this ? Best, Vishwas
flink's hard dependency on zookeeper for HA
Hi all, I am using flink 1.7.2 as a standalone cluster in high availability mode with zookeeper. I have noticed that all flink processes go down once zookeeper goes down ? Is this expected behavior since the leader election has already happened and the job has been running for several hours. Best, Vishwas
Partitioning based on key flink kafka sink
Hi all, I am using flink 1.7.0 and using this constructor FlinkKafkaProducer(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) >From the doc it says this constructor uses fixed partitioner. I want to partition based on key , so I tried to use this public FlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner) What should I pass in the optional field ? From the doc it says @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. * If a partitioner is not provided, records will be partitioned by the key of each record * (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys * are {@code null}, then records will be distributed to Kafka partitions in a * round-robin fashion. This is super confusing(contradicting in a way) since the previous constructor says that fixedpartitioner will be used if customPartioner is not present. Best, Vishwas
Re ordering events with flink
Hi guys, I want to know if it's possible to sort events in a flink data stream. I know I can't sort a stream but is there a way in which I can buffer for a very short time and sort those events before sending it to a data sink. In our scenario we consume from a kafka topic which has multiple partitions but the data in these brokers are *not* partitioned by a key(its round robin) , for example we want to time order transactions associated with a particular account but since the same account number ends up in different partitions at the source for different transactions we are not able to maintain event time order in our stream processing system since the same account number ends up in different task managers and slots. We do however partition by account number when we send the events to downstream kafka sink so that transactions from the same account number end up in the same partition. This is however not good enough since the events are not sorted at the source. Any ideas for doing this is much appreciated. Best, Vishwas
Re: Consumer group and flink
Please ignore this email. On Wed, Oct 16, 2019 at 1:40 PM Vishwas Siravara wrote: > Hi guys, > Is it necessary to specify a consumer group name for a kafka streaming job > when checkpointing is enabled? Since the offsets are not stored in kafka > how does specifying a consumer group help ? > > Best, > Vishwas >
Consumer group and flink
Hi guys, Is it necessary to specify a consumer group name for a kafka streaming job when checkpointing is enabled? Since the offsets are not stored in kafka how does specifying a consumer group help ? Best, Vishwas
Re: Flink restoring a job from a checkpoint
Hi Congxian, Thanks for getting back. Why would the streaming start from scratch if my consumer group does not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source") So when I restart the job it should consume from the last committed offset to kafka isn't it ? Let me know what you think . Best, Vishwas On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu wrote: > Hi Vishwas > > Currently, Flink can only restore retained checkpoint or savepoint with > parameter `-s`[1][2], otherwise, it will start from scratch. > > ``` > checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs] > savepoint --> bin/flink run -s :savepointPath [:runArgs] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint > [2] > https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints > > Best, > Congxian > > > Vishwas Siravara 于2019年10月9日周三 上午5:07写道: > >> Hi Yun, >> Thanks for your reply. I do start from GROUP_OFFSET . Here is the code >> snippet : >> >> env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka >> source") >> >> I have also enabled and externalized checkpointing to S3 . >> Why is it not recommended to just restart the job once I cancel it, as >> long as the topology does not change? What is the advantage of >> explicitly restoring from last checkpoint by passing the -s option to the >> flink command line if it does the same thing? For instance if >> s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ >> is my last successful checkpoint, what is the difference between 1 and 2. >> >> 1. /usr/mware/flink/bin/flink run -d -C >> file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main >> flink-job-assembly.jar flink druid -p 8 -cp qa_streaming >> 2. /usr/mware/flink/bin/flink run -s >> s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ >> -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main >> flink-job-assembly.jar flink druid -p 4 -cp qa_streaming >> >> Thanks, >> Vishwas >> >> On Tue, Oct 8, 2019 at 1:51 PM Yun Tang wrote: >> >>> Hi Vishwas >>> >>> If you did not configure your >>> org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is >>> GROUP_OFFSET by default, which means "Start from committed offsets in ZK / >>> Kafka brokers of a specific consumer group". And you need to enable >>> checkpoint so that kafka offsets are committed when checkpoint completes. >>> >>> In other words, even if you don't resume from checkpoint, just enable >>> checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could >>> restore from last committed offset if previous checkpoint completed [1][2]. >>> However, this is not really recommended, better to resume from last >>> checkpoint [3] >>> >>> [1] >>> https://www.slideshare.net/robertmetzger1/clickthrough-example-for-flinks-kafkaconsumer-checkpointing >>> [2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint >>> >>> >>> Best >>> Yun Tang >>> >>> >>> -- >>> *From:* Vishwas Siravara >>> *Sent:* Wednesday, October 9, 2019 0:54 >>> *To:* user >>> *Subject:* Flink restoring a job from a checkpoint >>> >>> Hi guys, >>> I have a flink streaming job which streams from a kafka source. There is >>> no state in the job, just a simple filter , map and write to a kafka sink. >>> Suppose I stop my job and then submit the job again to the cluster with the >>> same consumer group, will the job restore automatically from the last >>> successful checkpoint , since this is what is the last committed offset to >>> kafka ? >>> >>> Thanks, >>> Vishwas >>> >>
Flink restoring a job from a checkpoint
Hi guys, I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? Thanks, Vishwas
Increasing number of task slots in the task manager
Hi guys, I get java heap space error when I have 1 GB of TM memory and 4 slots(we have 4 cores in our lower environment) per TM , each slot has 1/4GB of managed memory. >From the flink doc https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources I see that slots are allocated memory statically . When I change the TM memory to 8GB , my job works fine without any heap issues with 4 slots. So here each slot gets around 2GB of heap . In another environment we have 60 cores. Does it make sense for me to have 60 slots in the task manager for 8GB of TM heap ? I assume that I will get heap space error since each slot will have 8/60 GB of memory . Is my assumption correct ? Thanks, Vishwas
High availability flink job
Hi guys, I have a flink job running in standalone mode with a parallelism of >1 , that produces data to a kafka sink. My topic is replicated with a replication factor of 2. Now suppose one of the kafka brokers goes down , then will my streaming job fail ? Is there a way where in I can continue processing until that broker node comes up, also the sink is partitioned by a key. Thanks, Vishwas
Flink kafka producer partitioning scheme
Hi guys, >From the flink doc *By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will use a FlinkFixedPartitioner that maps each Flink Kafka Producer parallel subtask to a single Kafka partition (i.e., all records received by a sink subtask will end up in the same Kafka partition).* Does this mean that if my downstream topic has 40 partitions , I will need 40 parallel subtasks ? Thanks, Vishwas
externalizing config flies for flink class loader
I have a standalone cluster. I have added my own library(jar file) to the lib/ folder in flink . I submit my job from cli after I start the cluster. Now I want to externalize a property file which has to be read by this library. Since this library is loaded by flink's classloader and not the application class loader I cannot supply this using flink run -C ... since this works only for user class loader. Thanks, Vishwas
Using FlinkKafkaConsumer API
I am using flink-kafka-connector and this is my dependency "org.apache.flink" %% "flink-connector-kafka" % *"1.7.0"*, Whe I look at my dependency tree the kafka client version is -org.apache.kafka:kafka-clients:2.0.1 which comes from the above package. However when I run my code in the cluster I see that the kafka-client that is loaded is 0.10.2.0 Here is the task executor log : 2019-09-09 03:05:56,825 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0 I am struggling to find out where this dependency is coming from. Our broker version is not compatible with this client. How can I force flink to use 2.0.1. Also the API I use for Kafka Consumer is private[flink] def sourceType: FlinkKafkaConsumer[GenericRecord] = { val consumer = new FlinkKafkaConsumer[GenericRecord]( source.asJava, AvroDeserialization.genericRecd, ExecutionEnv.streamProperties) consumer } } I really appreciate help. Is there any way I can find out where this dependency comes from in the cluster as this is clearly not coming form my application. Thanks, Vishwas
Re: understanding task manager logs
Thanks, I'll check it out. On Thu, Sep 5, 2019 at 5:22 AM Fabian Hueske wrote: > Hi Vishwas, > > This is a log statement from Kafka [1]. > Not sure how when AppInfoParser is created (the log message is written by > the constructor). > > For Kafka versions > 1.0, I'd recommend the universal connector [2]. > > Not sure how well it works if producers and consumers have different > versions. > Maybe Gordon (in CC) has some experience with that. > > Best, Fabian > > [1] > https://github.com/axbaretto/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java#L117 > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-100-connector > > Am Di., 3. Sept. 2019 um 04:04 Uhr schrieb Vishwas Siravara < > vsirav...@gmail.com>: > >> Hi guys, >> I am using flink 1.7.2 and my application consumes from a kafka topic and >> publish to another kafka topic which is in its own kafka environment >> running a different kafka version,. I am using FlinkKafkaConsumer010 from >> this dependency >> *"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion. * >> >> In the task manager log I see these lines: >> >> 2019-09-02 02:57:59,840 INFO >> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully >> logged in. >> 2019-09-02 02:57:59,841 INFO >> org.apache.kafka.common.security.kerberos.KerberosLogin - >> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh >> thread started. >> 2019-09-02 02:57:59,842 INFO >> org.apache.kafka.common.security.kerberos.KerberosLogin - >> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid >> starting at: Mon Sep 02 02:57:59 GMT 2019 >> 2019-09-02 02:57:59,843 INFO >> org.apache.kafka.common.security.kerberos.KerberosLogin - >> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT expires: Mon >> Sep 02 12:57:59 GMT 2019 >> 2019-09-02 02:57:59,843 INFO >> org.apache.kafka.common.security.kerberos.KerberosLogin - >> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh >> sleeping until: Mon Sep 02 11:14:13 GMT 2019 >> 2019-09-02 02:57:59,919 WARN >> org.apache.kafka.clients.consumer.ConsumerConfig - The >> configuration 'zookeeper.connect' was supplied but isn't a known >> config.*2019-09-02 02:57:59,919 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> version : 0.10.2.0 >> *2019-09-02 02:57:59,919 INFO org.apache.kafka.common.utils.AppInfoParser >> - Kafka commitId : 576d93a8dc0cf421 >> >> Here if you see the Kafka version is 0.10.2.0. Is this the version the >> broker is running or is this coming from flink ? I have forced the >> kafka-client version >> >> to be 2.2.0 >> >> "org.apache.kafka" % "kafka-clients" % "2.2.0" force() >> >> I also don't see 0.10.2.0 in the dependency tree of my build. >> >> Also will flink-connector-kafka-0.10 work for kafka versions > 1.0 ? What >> should I do if the consumer broker and producer broker are on different >> versions of kafka ? >> >> >> Thanks, >> >> Vishwas >> >> >> Thanks, >> >> Vishwas >> >> >> >>
Flink Kafka Connector
Hi guys, I am using flink connector for kakfa from 1.9.0 Her is my sbt dependency : "org.apache.flink" %% "flink-connector-kafka" % "1.9.0", When I check the log file I see that the kafka version is 0.10.2.0. According to the docs it says that 1.9.0 onwards the version should be 2.2.0. Why do I see this 2019-09-06 01:41:43,534 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0. This creates a big problem, I can connect to the broker but I don't see any messages. Why is this ? Thanks, Vishwas
understanding task manager logs
Hi guys, I am using flink 1.7.2 and my application consumes from a kafka topic and publish to another kafka topic which is in its own kafka environment running a different kafka version,. I am using FlinkKafkaConsumer010 from this dependency *"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion. * In the task manager log I see these lines: 2019-09-02 02:57:59,840 INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. 2019-09-02 02:57:59,841 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh thread started. 2019-09-02 02:57:59,842 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid starting at: Mon Sep 02 02:57:59 GMT 2019 2019-09-02 02:57:59,843 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT expires: Mon Sep 02 12:57:59 GMT 2019 2019-09-02 02:57:59,843 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh sleeping until: Mon Sep 02 11:14:13 GMT 2019 2019-09-02 02:57:59,919 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config.*2019-09-02 02:57:59,919 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0 *2019-09-02 02:57:59,919 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421 Here if you see the Kafka version is 0.10.2.0. Is this the version the broker is running or is this coming from flink ? I have forced the kafka-client version to be 2.2.0 "org.apache.kafka" % "kafka-clients" % "2.2.0" force() I also don't see 0.10.2.0 in the dependency tree of my build. Also will flink-connector-kafka-0.10 work for kafka versions > 1.0 ? What should I do if the consumer broker and producer broker are on different versions of kafka ? Thanks, Vishwas Thanks, Vishwas
Re: Flink and kerberos
Thanks, I'll check it out. On Thu, Aug 29, 2019 at 1:08 PM David Morin wrote: > Vishwas, > > A config that works on my Kerberized cluster (Flink on Yarn). > I hope this will help you. > > Flink conf: > security.kerberos.login.use-ticket-cache: true > security.kerberos.login.keytab: /home/myuser/myuser.keytab > security.kerberos.login.principal: myuser@ > security.kerberos.login.contexts: Client > > Properties related to security passed as argument of the > FlinkKafkaConsumerXX constructor: > sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule > required username=\"myuser\" password=\"\";" > sasl.mechanism=PLAIN > security.protocol=SASL_SSL > > Le jeu. 29 août 2019 à 18:20, Vishwas Siravara a > écrit : > >> Hey David , >> My consumers are registered , here is the debug log. The problem is the >> broker does not belong to me , so I can’t see what is going on there . But >> this is a new consumer group , so there is no state yet . >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> Consumer subtask 0 will start reading the following 40 partitions from the >> committed group offsets in Kafka: >> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] >> >> On Thu, Aug 29, 2019 at 11:39 AM David Morin >> wrote: >> >>> Hello Vishwas, >>> >>> You can use a keytab if you prefer. You generate a keytab for your user >>> and then you can reference it in the Flink configuration. >>> Then this keytab will be handled by Flink in a secure way and TGT will >>> be created based on this keytab. >>> However, that seems to be working. >>> Did you check Kafka logs on the broker side ? >>> Or did you check consumer offsets with Kafka tools in order to validate >>> consumers are registered onto the different partitions of your topic ? >>> You could try to switch to a different groupid for your consumer gro
Re: Flink and kerberos
Hey David , My consumers are registered , here is the debug log. The problem is the broker does not belong to me , so I can’t see what is going on there . But this is a new consumer group , so there is no state yet . org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] On Thu, Aug 29, 2019 at 11:39 AM David Morin wrote: > Hello Vishwas, > > You can use a keytab if you prefer. You generate a keytab for your user > and then you can reference it in the Flink configuration. > Then this keytab will be handled by Flink in a secure way and TGT will be > created based on this keytab. > However, that seems to be working. > Did you check Kafka logs on the broker side ? > Or did you check consumer offsets with Kafka tools in order to validate > consumers are registered onto the different partitions of your topic ? > You could try to switch to a different groupid for your consumer group in > order to force parallel consumption. > > Le jeu. 29 août 2019 à 09:57, Vishwas Siravara a > écrit : > >> I see this log as well , but I can't see any messages . I know for a fact >> that the topic I am subscribed to has messages as I checked with a simple >> java consumer with a different group. >> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> Consumer subtask 0 will start reading the following 40 partitions from the >> committed group offsets in Kafka: >> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=
Re: Flink and kerberos
I see this log as well , but I can't see any messages . I know for a fact that the topic I am subscribed to has messages as I checked with a simple java consumer with a different group. org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara wrote: > Hi guys, > I am using kerberos for my kafka source. I pass the jaas config and > krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf > -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/ > -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf > -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf > > When I look at debug logs I see that the consumer was created with the > following properties. > > 2019-08-29 06:49:18,298 INFO > org.apache.kafka.clients.consumer.ConsumerConfig - > ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [sl73oprdbd018.visa.com:9092] > check.crcs = true > client.id = consumer-2 > connections.max.idle.ms = 54 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > > > group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit >
Flink and kerberos
Hi guys, I am using kerberos for my kafka source. I pass the jaas config and krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/ -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf When I look at debug logs I see that the consumer was created with the following properties. 2019-08-29 06:49:18,298 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [sl73oprdbd018.visa.com:9092] check.crcs = true client.id = consumer-2 connections.max.idle.ms = 54 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 30 max.poll.records = 500 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 1 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer I can also see that the kerberos login is working fine. Here is the log for it: 2019-08-29 06:49:18,312 INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. 2019-08-29 06:49:18,313 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh thread started. 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid starting at: Thu Aug 29 06:49:18 GMT 2019 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT expires: Thu Aug 29 16:49:18 GMT 2019 2019-08-29 06:49:18,315 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh sleeping until: Thu Aug 29 15:00:10 GMT 2019 2019-08-29 06:49:18,316 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421 I then see this log : INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) *The problem is I do not see any error log but there is no data being processed by the consmer and it has been a nightmare to debug. * Thanks for all the help . Thanks,Vishwas
Re: Loading dylibs
Yes this is exactly what happens , as a work around I created a small jar file which has code to load the dylib and I placed it under the lib folder , this library is in provided scope in my actual job, so the dylib gets loaded only once when the tm/jm jvm starts . What I found interesting in my old approach was even when I check whether the dylib has already been loaded in the current thread , and if it is I still get the unsatisfied link error even though that dylib is loaded in the task manager . On Wed, Aug 28, 2019 at 7:04 AM Aleksey Pak wrote: > Hi Vishwas, > > There is a known issue in the Flink Jira project [1]. > Is it possible that you have encountered the same problem? > > [1]: https://issues.apache.org/jira/browse/FLINK-11402 > > Regards, > Aleksey > > > On Tue, Aug 27, 2019 at 8:03 AM Vishwas Siravara > wrote: > >> Hi Jörn, >> I tried that. Here is my snippet : >> >> String[] loadedlibs = >> getLoadedLibraries(Thread.currentThread().getContextClassLoader()); >> if(!containsVibeSimpleLib(loadedlibs)) { >> System.loadLibrary("vibesimplejava"); >> } >> >> Now I get the exception Unexpected errorjava.lang.UnsatisfiedLinkError: >> com.voltage.securedata.enterprise.ConstantsNative.DIGEST_MD5()I which means >> that it could not find vibesimplejava in the loaded libs but I know that >> the if was not executed because vibesimplejava was present in loadedlibs( >> the control never went inside the if block. Any other suggestions? >> >> Thanks, >> Vishwas >> >> >> >> >> >> >> On Tue, Aug 27, 2019 at 12:25 AM Jörn Franke >> wrote: >> >>> I don’t know Dylibs in detail, but can you call a static method where it >>> checks if it has been already executed and if not then it loads the library >>> (Singleton pattern)? >>> >>> Am 27.08.2019 um 06:39 schrieb Vishwas Siravara : >>> >>> Hi guys, >>> I have a flink application that loads a dylib like this >>> >>> System.loadLibrary("vibesimplejava"); >>> >>> >>> The application runs fine , when I restart the job I get this exception >>> : >>> >>> com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected >>> errorjava.lang.UnsatisfiedLinkError: Native Library >>> /usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/libvibesimplejava.so >>> already loaded in another classloader >>> >>> This happens because the dylib has already been loaded once by the >>> taskmanger, how can I mitigate this? It seems problematic if two >>> applications are loading the same dylib. >>> >>> Thanks, >>> Vishwas >>> >>>
Re: Loading dylibs
Hi Jörn, I tried that. Here is my snippet : String[] loadedlibs = getLoadedLibraries(Thread.currentThread().getContextClassLoader()); if(!containsVibeSimpleLib(loadedlibs)) { System.loadLibrary("vibesimplejava"); } Now I get the exception Unexpected errorjava.lang.UnsatisfiedLinkError: com.voltage.securedata.enterprise.ConstantsNative.DIGEST_MD5()I which means that it could not find vibesimplejava in the loaded libs but I know that the if was not executed because vibesimplejava was present in loadedlibs( the control never went inside the if block. Any other suggestions? Thanks, Vishwas On Tue, Aug 27, 2019 at 12:25 AM Jörn Franke wrote: > I don’t know Dylibs in detail, but can you call a static method where it > checks if it has been already executed and if not then it loads the library > (Singleton pattern)? > > Am 27.08.2019 um 06:39 schrieb Vishwas Siravara : > > Hi guys, > I have a flink application that loads a dylib like this > > System.loadLibrary("vibesimplejava"); > > > The application runs fine , when I restart the job I get this exception : > > com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected > errorjava.lang.UnsatisfiedLinkError: Native Library > /usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/libvibesimplejava.so > already loaded in another classloader > > This happens because the dylib has already been loaded once by the > taskmanger, how can I mitigate this? It seems problematic if two > applications are loading the same dylib. > > Thanks, > Vishwas > >
Loading dylibs
Hi guys, I have a flink application that loads a dylib like this System.loadLibrary("vibesimplejava"); The application runs fine , when I restart the job I get this exception : com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected errorjava.lang.UnsatisfiedLinkError: Native Library /usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/libvibesimplejava.so already loaded in another classloader This happens because the dylib has already been loaded once by the taskmanger, how can I mitigate this? It seems problematic if two applications are loading the same dylib. Thanks, Vishwas
Re: Using shell environment variables
You can also link at runtime by providing the path to the dylib by adding -Djava.library.path= in jvm options in the task manager On Sat, Aug 24, 2019 at 9:11 PM Zhu Zhu wrote: > Hi Abhishek, > > You need to export the environment variables on all the worker > machines(not the machine to submit the job). > > Alternatively, if you are submitting the job to a yarn cluster, you can > use flink conf prefix "containerized.taskmanager.env." to add environment > variables to Flink's task manager process. > For example for passing LD_LIBRARY_PATH as an env variable to the workers, > set: containerized.taskmanager.env.LD_LIBRARY_PATH: "/usr/lib/native" in > the flink-conf.yaml. > > Thanks, > Zhu Zhu > > Abhishek Jain 于2019年8月25日周日 上午2:48写道: > >> Hi Miki, >> Thanks for your reply. ParameterTool will only help in making the value >> accessible through ParameterTool.get(). However, I need a way of accessing >> the value using "System.getenv" since the underlying library uses it so. >> >> On Sat, 24 Aug 2019 at 23:04, miki haiat wrote: >> >>> Did you register your system environment parameter ? >>> >>> You can find here several ways to use configuration data [1] >>> >>> 1. >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/best_practices.html >>> >>> >>> On Sat, Aug 24, 2019, 20:26 Abhishek Jain wrote: >>> Hi! I am using a library that depends on a certain environment variable set (mandatorily). Now, I've exported this variable in my environment but somehow it's not being read by the task manager. Following is the exception I get when I try to run the job: Caused by: com.example.MyCustomException: Env token is null at com.example.AerospikeSink.open(AerospikeSink.java:47) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Here's the code that throws this exception: @Override public void open(Configuration config) throws Exception { if (System.getenv("API_TOKEN") == null) { throw new MyCustomException("Env token is null"); } } My question: Is there an alternative to System.getenv() that I can use to access environment variables inside of flink task? ( P.S. I've only copied relevant code snippet to avoid confusion. I do intend to use API_TOKEN later on. ) -- Warm Regards, Abhishek Jain >>> >> >> -- >> Warm Regards, >> Abhishek Jain >> >
Re: Use logback instead of log4j
Any idea on how I can use log back instead ? On Fri, Aug 23, 2019 at 1:22 PM Vishwas Siravara wrote: > Hi , > From the flink doc , in order to use logback instead of log4j " Users > willing to use logback instead of log4j can just exclude log4j (or delete > it from the lib/ folder)." > https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html > . > > However when i delete it from the lib and start the cluster , there are no > logs generated , instead I see console log which says "Failed to > instantiate SLF4J LoggerFactory" > > Reported exception: > java.lang.NoClassDefFoundError: org/apache/log4j/Level > at org.slf4j.LoggerFactory.bind(LoggerFactory.java:143) > at > org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:122) > at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:378) > at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:328) > at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.(ClusterEntrypoint.java:98) > > > How can I use logback instead ? > > > Thanks, > Vishwas > >
Re: Externalized checkpoints
Got it.Thank you On Thu, Aug 22, 2019 at 8:54 PM Congxian Qiu wrote: > Hi, Vishwas > > As Zhu Zhu said, you can set "state.checkpoints.num-retained"[1] to > specify the maximum number of completed checkpoints to retain. > maybe you can also ref the external checkpoint cleanup type[2] config for > how to clean up the retained checkpoint[2] > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.html#related-config-options > [2] > https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#retained-checkpoints > Best, > Congxian > > > Zhu Zhu 于2019年8月22日周四 上午10:13写道: > >> Hi Vishwas, >> >> You can configure "state.checkpoints.num-retained" to specify the max >> checkpoints to retain. >> By default it is 1. >> >> Thanks, >> Zhu Zhu >> >> Vishwas Siravara 于2019年8月22日周四 上午6:48写道: >> >>> I am also using exactly once checkpointing mode, I have a kafka source >>> and sink so both support transactions which should allow for exactly once >>> processing. Is this the reason why there is only one checkpoint retained ? >>> >>> Thanks, >>> Vishwas >>> >>> On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara >>> wrote: >>> >>>> Hi peeps, >>>> I am externalizing checkpoints in S3 for my flink job and I retain them >>>> on cancellation. However when I look into my S3 bucket where the >>>> checkpoints are stored there is only 1 checkpoint at any point in time . Is >>>> this the default behavior of flink where older checkpoints are deleted when >>>> the current checkpoint completes ? Here are a few screenshots. What are >>>> your thoughts on restoring an older state which is not the previous state ? >>>> >>>> List contents of bucket at time 0 >>>> >>>> Object Name: >>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last >>>> modified time : Wed Aug 21 22:17:23 GMT 2019 >>>> Object Name: >>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified >>>> time : Wed Aug 21 22:17:24 GMT 2019 >>>> >>>> List contents of bucket at time 1 >>>> >>>> Printing last modified times >>>> Object Name: >>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last >>>> modified time : Wed Aug 21 22:23:24 GMT 2019 >>>> Object Name: >>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified >>>> time : Wed Aug 21 22:23:24 GMT 2019 >>>> >>>> Thanks, >>>> >>>> Vishwas >>>> >>>>
Use logback instead of log4j
Hi , >From the flink doc , in order to use logback instead of log4j " Users willing to use logback instead of log4j can just exclude log4j (or delete it from the lib/ folder)." https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html . However when i delete it from the lib and start the cluster , there are no logs generated , instead I see console log which says "Failed to instantiate SLF4J LoggerFactory" Reported exception: java.lang.NoClassDefFoundError: org/apache/log4j/Level at org.slf4j.LoggerFactory.bind(LoggerFactory.java:143) at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:122) at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:378) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:328) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.(ClusterEntrypoint.java:98) How can I use logback instead ? Thanks, Vishwas
Re: Externalized checkpoints
I am also using exactly once checkpointing mode, I have a kafka source and sink so both support transactions which should allow for exactly once processing. Is this the reason why there is only one checkpoint retained ? Thanks, Vishwas On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara wrote: > Hi peeps, > I am externalizing checkpoints in S3 for my flink job and I retain them on > cancellation. However when I look into my S3 bucket where the checkpoints > are stored there is only 1 checkpoint at any point in time . Is this the > default behavior of flink where older checkpoints are deleted when the > current checkpoint completes ? Here are a few screenshots. What are your > thoughts on restoring an older state which is not the previous state ? > > List contents of bucket at time 0 > > Object Name: > checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last > modified time : Wed Aug 21 22:17:23 GMT 2019 > Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast > modified time : Wed Aug 21 22:17:24 GMT 2019 > > List contents of bucket at time 1 > > Printing last modified times > Object Name: > checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last > modified time : Wed Aug 21 22:23:24 GMT 2019 > Object Name: > checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified > time : Wed Aug 21 22:23:24 GMT 2019 > > Thanks, > > Vishwas > >
Externalized checkpoints
Hi peeps, I am externalizing checkpoints in S3 for my flink job and I retain them on cancellation. However when I look into my S3 bucket where the checkpoints are stored there is only 1 checkpoint at any point in time . Is this the default behavior of flink where older checkpoints are deleted when the current checkpoint completes ? Here are a few screenshots. What are your thoughts on restoring an older state which is not the previous state ? List contents of bucket at time 0 Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last modified time : Wed Aug 21 22:17:23 GMT 2019 Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified time : Wed Aug 21 22:17:24 GMT 2019 List contents of bucket at time 1 Printing last modified times Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last modified time : Wed Aug 21 22:23:24 GMT 2019 Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified time : Wed Aug 21 22:23:24 GMT 2019 Thanks, Vishwas
Flink logback
Hi all, I modified the logback.xml provided by flink distribution, so now the logback.xml file looks like this : *${log.file} false %d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n true INFOACCEPT DENY /var/mware/logs/APPLOGS/service.log /var/mware/logs/APPLOGS/Archive/service.%d{-MM-dd_HH}.log .gz %d{-MM-dd_HH:mm:ss.SSS} %p %c | %m%nUTF-8 0 5 ERRORACCEPT DENY /var/mware/logs/APPLOGS/service-error.log /var/mware/logs/APPLOGS/Archive/service-error.%d{-MM-dd_HH}.log.gz %d{-MM-dd_HH:mm:ss.SSS} %p %c %m%n UTF-8 0 5 * I have two file appenders defined. How ever my application log ends up going to the taskexecutor log file ? How can I fix this issue ? Thanks, Vishwas
Configuring logback
Hi guys, I am using logback for my application logs. I have logback.xml as a part of my fat jar that I submit to flink via command line flink run "...". When I run my application from IDE , the appenders are what I have set in my logback but when I run from command line the appender defaults to the root in the flink installation directory. How can I make sure that my application logs go to the correct appender. Here is my logback.xml file which is available in the classpath. true INFO ACCEPT DENY ${APP_LOG_ROOT}service.log ${APP_LOG_ROOT}Archive/service.%d{-MM-dd_HH}.log.gz %d{-MM-dd_HH:mm:ss.SSS} %p %c | %m%n UTF-8 0 5 ERROR ACCEPT DENY ${APP_LOG_ROOT}service-error.log ${APP_LOG_ROOT}Archive/service-error.%d{-MM-dd_HH}.log.gz %d{-MM-dd_HH:mm:ss.SSS} %p %c %m%n UTF-8 0 5 %d{-MM-dd_HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %m%n UTF-8 Thanks, Vishwas
Re: Configuring logback for my flink job
My logback is a part of my application jar file, do you mean I should externalize it and pass set its location in FLINK_CONF_DIR ? like export FLINK_CONF_DIR=/home/was/ where I have logback.xml in /home/was/ ? Thanks, Vishwas On Mon, Aug 19, 2019 at 10:31 PM Yang Wang wrote: > Hi Vishwas, > > If you mean to have your application logs to its configured appenders in > client, i think you could use your own FLINK_CONF_DIR environment. > Otherwise, we could not update the log4j/logback configuration.[1] > > > [1] > https://github.com/apache/flink/blob/master/flink-dist/src/main/flink-bin/bin/flink#L52 > > Vishwas Siravara 于2019年8月19日周一 下午11:02写道: > >> Hi, >> I have a logback for my flink application which is packaged with the >> application fat jar. However when I submit my job from flink command line >> tool, I see that logback is set to >> -Dlogback.configurationFile=file:/data/flink-1.7.2/conf/logback.xml from >> the client log. >> >> As a result my application log ends up going to the client log. How can I >> change this behavior . I know a dirty fix is to add my logback config to >> the logback in /data/flink-1.7.2/conf/logback.xml . >> >> Any other suggestions on how I can have my application log to its >> configured appenders. >> >> Thanks, >> Vishwas >> >
Configuring logback for my flink job
Hi, I have a logback for my flink application which is packaged with the application fat jar. However when I submit my job from flink command line tool, I see that logback is set to -Dlogback.configurationFile=file:/data/flink-1.7.2/conf/logback.xml from the client log. As a result my application log ends up going to the client log. How can I change this behavior . I know a dirty fix is to add my logback config to the logback in /data/flink-1.7.2/conf/logback.xml . Any other suggestions on how I can have my application log to its configured appenders. Thanks, Vishwas
Re: Understanding job flow
I did not find this to be true. Here is my code snippet. object DruidStreamJob extends Job with SinkFn { private[flink] val druidConfig = DruidConfig.current private[flink] val decryptMap = ExecutionEnv.loadDecryptionDictionary // //TODO: Add this to sbt jvm, this should be set in sbt fork jvm. This is hack. // System.setProperty("java.library.path", "/Users/vsiravar/workspace/aipcryptoclient/lib") // // import java.lang.reflect.Field // // val fieldSysPath: Field = classOf[ClassLoader].getDeclaredField("sys_paths") // fieldSysPath.setAccessible(true) // fieldSysPath.set(null, null) // // print(System.getProperty("java.library.path")) private[flink] val aipSimpleAPIEncryptor = new AipCryptoClient( ExecutionEnv.mockEncryption, ExecutionEnv.enableEncryption, ExecutionEnv .loadEncryptionSet) aipSimpleAPIEncryptor.init("aip_crypto_config.properties") val appLogger: Logger = LoggerFactory.getLogger(DruidStreamJob.getClass) val errorLogger: Logger = LoggerFactory.getLogger("streaming.error") * private[flink] val sdsClient = SDSEncryptor(decryptMap, ExecutionEnv.mockDecryption)* sdsClient.init() /** * Start streaming job execution . * * @param argMap */ private[flink] def runJob(argMap: Map[String, String]): Unit = { val env = ExecutionEnv.executionEnv(argMap) this.source = ExecutionEnv.sourceTopics env.enableCheckpointing(1000) env.setStateBackend(new FsStateBackend("s3://vishwas.test1/checkpoints")) sourceAndSinkFn(env, source) env.execute(jobName = name) } /** * @inheritdoc * @param env * @param topics */ override private[flink] def sourceAndSinkFn( env: StreamExecutionEnvironment, topics: List[String]) = { val dataStream = addSource(env) log.info("Subscribed to topics" + topics) val filteredStream = dataStream.filter(new FilterFunction[GenericRecord] { override def filter(value: GenericRecord): Boolean = { ExecutionEnv.messageTypeList.contains(value.get("CMLS_REQST_MSG_TYP").toString) & ExecutionEnv .pcrList.contains(value.get("CMLS_DEST_PCR").toString) } }) val result = filteredStream.map(record => encryptWithAipCryptoClient(addTimeStamp(sdsClient .decrypt(applyValues(record) result.print() KafkaSink(result).sendToKafka } private[flink] def encryptWithAipCryptoClient(maptoEncrypt: mutable.Map[String, Any]): mutable.Map[String, Any] = { aipSimpleAPIEncryptor.encrypt(maptoEncrypt.asInstanceOf[mutable.Map[String, AnyRef]].asJava) maptoEncrypt } private[flink] def applyValues( genericRecord: GenericRecord): mutable.Map[String, Any] = { collection.mutable.Map(genericRecord.getSchema.getFields.asScala .map(field => field.schema().getType match { case Schema.Type.LONG => field.name() -> genericRecord.get(field.name()).asInstanceOf[Long] case Schema.Type.INT => field.name() -> genericRecord.get(field.name()).asInstanceOf[Int] case Schema.Type.DOUBLE => field.name() -> genericRecord.get(field.name()).asInstanceOf[Double] case Schema.Type.STRING => field.name() -> genericRecord.get(field.name()).toString case _ => field.name() -> genericRecord.get(field.name()).toString }): _*) } private[flink] def addTimeStamp(payload: mutable.Map[String, Any]): mutable.Map[String, Any] = { try { if (!payload("CMLS_CPD_ORIG_DT").equals("19000101")) { return payload + ("timestamp" -> TimeUtility.convertDateStringToLong(payload("CMLS_CPD_ORIG_DT").asInstanceOf[String], payload("CMLS_AUTH_TIME").asInstanceOf[Int])); } return payload + ("timestamp" -> System.currentTimeMillis()) } catch { case e: Exception => { errorLogger.error("Unable to obtain epoch time, using currentSystem time" + e.printStackTrace()) return payload + ("timestamp" -> System.currentTimeMillis()) } } } } The code for initialization of sds client(font is green for that piece of code) is in the main thread, even before the job graph is created. However when I run this code on a cluster with 3 task managers on different nodes, it is initialized each time on all the 3 nodes(taskmanager). I wonder why this happens. Thanks, Vishwas On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson wrote: > @transient or use a static factory. > > In Scala we use a @transient lazy val with an initializer to do this > > Sent from my iPhone > > On Aug 15, 2019, at 11:40 AM, Vishwas Siravara > wrote: > > Thanks Steven. Is there a way where in I can create a singleton instance > in each task
Understanding job flow
Hi guys, I have a map job where I want to encrypt certain keys . I initialize the encryptor in the main method and apply it in the map function. How is this encryptor shared when I have my job running on multiple task managers with parallelism > 1 ? Thanks, Vishwas
Flink job parallelism
Hi guys, I have a flink job which I want to run with a parallelism of 2. I run it from command line like : flink run -p 2 -C file:///home/was/classpathconfig/ -c com.visa.flink.cli.Main flink-job-assembly-0.1-SNAPSHOT.jar flink druid My cluster has two task managers with only 1 task slot each. However when I look at the Web UI for my job , I see that one of the task managers is still available. But when I submit with the web UI , both the task managers are used for this job and I get a parallelism of 2. Can you help me with understanding as to why this happens ? Thank you Vishwas
Re: How can I pass multiple java options in standalone mode ?
No this does not work , the config in flink-config.yaml applies only to job manager and task manager for env.java.options. When I say "flink run ..." from command line, there is a new jvm which is created which starts the main method and that does not get the env.java.options which is set in the flink-conf.yaml. This works when I set JVM_ARGS and export it before "flink run ..." Here is a similar link from stackoverflow : https://stackoverflow.com/questions/42344624/apache-flink-custom-java-options-are-not-recognized-inside-job Thanks, Vishwas On Thu, Aug 15, 2019 at 1:20 AM Yang Wang wrote: > Hi Vishwas > > The java option is used to start jobmanager and taskmanager jvm process. > It should take effect when you set is in the flink-conf.yaml on all nodes. > It is a flink cluster level configuration, not a job level. So i'm not > sure what do you want to do. > > > Best, > Yang > > Vishwas Siravara 于2019年8月15日周四 上午3:39写道: > >> Thanks a lot, I fixed that, so now this works when I submit my job with >> the flink UI but when I submit it via flink run(command line) it does not >> take this env.java.opts: -Dconfig.resource=qa.conf property . How can I >> pass the jvm property to flink run which is running standalone without >> resource manager. >> >> Thanks, >> Vishwas >> >> On Wed, Aug 14, 2019 at 2:35 PM Aleksandar Mastilovic < >> amastilo...@sightmachine.com> wrote: >> >>> It’s a YAML file, so I think you need to do something like >>> >>> env.java.opts: -Dconfig.resource=qa.conf >>> >>> On Aug 14, 2019, at 11:58 AM, Vishwas Siravara >>> wrote: >>> >>> When I add env.java.opts like this >>> env.java.opts:"-Dconfig.resource=qa.conf" >>> >>> I see an error in the log file saying : - Error while trying to split >>> key and value in configuration file >>> /data/flink-1.7.2/conf/flink-conf.yaml:248: >>> "env.java.opts:"-Dconfig.resource=qa.conf"" >>> >>> This is really confusing and I cant find any document on how I should >>> pass this option. >>> >>> Thanks, >>> Vishwas >>> >>> On Wed, Aug 14, 2019 at 12:40 PM Vishwas Siravara >>> wrote: >>> >>>> Is it possible for me to pass these arguments along with the job when I >>>> do flink run and then pass the jvm options. For example if I want to pass >>>> this parameter -Dconfig.resource=qa.conf and qa.conf is packaged in the >>>> job's fat jar then flink will not find this file if I pass >>>> -Dconfig.resource=qa.conf and qa.conf in env.java.opts. >>>> >>>> Thanks, >>>> Vishwas >>>> >>>> On Mon, Aug 12, 2019 at 6:00 PM Zili Chen wrote: >>>> >>>>> Hi Vishwas, >>>>> >>>>> Replace ',' with ' '(space) should work. >>>>> >>>>> Best, >>>>> tison. >>>>> >>>>> >>>>> Vishwas Siravara 于2019年8月13日周二 上午6:50写道: >>>>> >>>>>> Hi guys, >>>>>> I have this entry in flink-conf.yaml file for jvm options. >>>>>> env.java.opts: "-Djava.security.auth.login.config={{ >>>>>> flink_installed_dir }}/kafka-jaas.conf,-Djava.security.krb5.conf={{ >>>>>> flink_installed_dir }}/krb5.conf" >>>>>> >>>>>> Is this supposed to be a , separated list ? I get a parse exception >>>>>> when the cluster starts. >>>>>> >>>>>> Thanks, >>>>>> Vishwas >>>>>> >>>>> >>>
External classpath
Hi guys, I m very close to deploying my application in production so I am trying to externalize some of the config files which has to be available on the classpath when I run my application via flink command line interface. >From the flink doc I can add to class path by -C,--classpath Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. So my job has 8 task managers( 8 different nodes) with 8 slots in each . Does my external class path have to be on NFS share ? Can I not have the config directory on each machine in the same location ? For instance on Node 1 the config files are in the directory is /home/was/classpathconfig/ and the same on every node. Does it have to be on an NFS ? My command looks like this flink run -C file://home/was/classpathconfig/ -c com.visa.flink.cli.Main flink-job-assembly-0.1-SNAPSHOT.jar flink druid Also I tried to put my files in s3 and tried to run flink run -C s3:// flink.dev/config -c com.visa.flink.cli.Main flink-job-assembly-0.1-SNAPSHOT.jar flink druid Bad syntax for classpath: s3://flink.dev/config s3 does support URLClassLoader but I get the error saying bad syntax. Please let me know your thoughts. Thanks a lot to this community , I was able to write my code in a week. Thanks, Vishwas
How can I pass jvm options to flink when started from command line
I understand that when I run a flink job from command line it forks a jvm and runs the main method and the flink related code run in the task manager. So when I say "flink run " the main does not run on JobManager hence it does not take env.java.options set in the flink-conf.yaml as this applies to the job manager and task manager. Now how can I pass jvm options like -Dconfig.resource=qa.conf from command line ? Thanks, Vishwas
Re: How can I pass multiple java options in standalone mode ?
Thanks a lot, I fixed that, so now this works when I submit my job with the flink UI but when I submit it via flink run(command line) it does not take this env.java.opts: -Dconfig.resource=qa.conf property . How can I pass the jvm property to flink run which is running standalone without resource manager. Thanks, Vishwas On Wed, Aug 14, 2019 at 2:35 PM Aleksandar Mastilovic < amastilo...@sightmachine.com> wrote: > It’s a YAML file, so I think you need to do something like > > env.java.opts: -Dconfig.resource=qa.conf > > On Aug 14, 2019, at 11:58 AM, Vishwas Siravara > wrote: > > When I add env.java.opts like this > env.java.opts:"-Dconfig.resource=qa.conf" > > I see an error in the log file saying : - Error while trying to split > key and value in configuration file > /data/flink-1.7.2/conf/flink-conf.yaml:248: > "env.java.opts:"-Dconfig.resource=qa.conf"" > > This is really confusing and I cant find any document on how I should pass > this option. > > Thanks, > Vishwas > > On Wed, Aug 14, 2019 at 12:40 PM Vishwas Siravara > wrote: > >> Is it possible for me to pass these arguments along with the job when I >> do flink run and then pass the jvm options. For example if I want to pass >> this parameter -Dconfig.resource=qa.conf and qa.conf is packaged in the >> job's fat jar then flink will not find this file if I pass >> -Dconfig.resource=qa.conf and qa.conf in env.java.opts. >> >> Thanks, >> Vishwas >> >> On Mon, Aug 12, 2019 at 6:00 PM Zili Chen wrote: >> >>> Hi Vishwas, >>> >>> Replace ',' with ' '(space) should work. >>> >>> Best, >>> tison. >>> >>> >>> Vishwas Siravara 于2019年8月13日周二 上午6:50写道: >>> >>>> Hi guys, >>>> I have this entry in flink-conf.yaml file for jvm options. >>>> env.java.opts: "-Djava.security.auth.login.config={{ >>>> flink_installed_dir }}/kafka-jaas.conf,-Djava.security.krb5.conf={{ >>>> flink_installed_dir }}/krb5.conf" >>>> >>>> Is this supposed to be a , separated list ? I get a parse exception >>>> when the cluster starts. >>>> >>>> Thanks, >>>> Vishwas >>>> >>> >
How can I pass multiple java options in standalone mode ?
Hi guys, I have this entry in flink-conf.yaml file for jvm options. env.java.opts: "-Djava.security.auth.login.config={{ flink_installed_dir }}/kafka-jaas.conf,-Djava.security.krb5.conf={{ flink_installed_dir }}/krb5.conf" Is this supposed to be a , separated list ? I get a parse exception when the cluster starts. Thanks, Vishwas
Passing jvm options to flink
Hi , I am running flink on a standalone cluster without any resource manager like yarn or K8s. I am submitting my job using command line "*f**link run ..." . *I have a couple of questions: *1. *How can I pass JVM parameters to this job. I want to pass a parameter for a dylib like this -Djava.library.path=/Users/vsiravar/temp/apollo/aipcryptoclient/lib . 2. If I want to add an external directory on my server to my classpath , how can I do that ? Thanks for all the help. I am truly grateful for all the help I have got building my flink app. Thanks, Vishwas
Streaming from a file
Hi guys, Is it possible for flink to stream from a unix file system line by line, when I use readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings. The entire contents of the file comes as a datastream, over which I can iterate to extract each line as a string. Is it possible to stream a line at a time ? Thanks for all the help so far. Thanks, Vishwas
Re: S3 checkpointing exception
I found the solution to this problem , it was a dependency issue, I had to exclude "xml-apis" to get this fixed. Also the s3-presto jar provides better error messages which was helpful. Thanks, Vishwas On Thu, Jul 18, 2019 at 8:14 PM Vishwas Siravara wrote: > I am using ecs S3 instance to checkpoint, I use the following > configuration. > > > s3.access-key vdna_np_user > s3.endpoint https://SU73ECSG**COM:9021 > s3.secret-key **I set the checkpoint in the code like > env.setStateBackend(*new * > FsStateBackend("s3://vishwas.test1/checkpoints")) > > I have a bucket called vishwas.test1, should I first create a directory in > s3 called checkpoints first ? > > I see this error in the log , what does this mean ? Thank you so much for > your help. > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSClientIOException: > getFileStatus on > s3://vishwas.test1/checkpoint/35abe5cadda5ff77fd3347a956b6f1e2: > org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: > Couldn't initialize a SAX driver to create an XMLReader: Couldn't > initialize a SAX driver to create an XMLReader > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2251) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2037) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2007) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:83) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) >
S3 checkpointing exception
I am using ecs S3 instance to checkpoint, I use the following configuration. s3.access-key vdna_np_user s3.endpoint https://SU73ECSG**COM:9021 s3.secret-key **I set the checkpoint in the code like env.setStateBackend(*new *FsStateBackend("s3://vishwas.test1/checkpoints")) I have a bucket called vishwas.test1, should I first create a directory in s3 called checkpoints first ? I see this error in the log , what does this mean ? Thank you so much for your help. org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSClientIOException: getFileStatus on s3://vishwas.test1/checkpoint/35abe5cadda5ff77fd3347a956b6f1e2: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Couldn't initialize a SAX driver to create an XMLReader: Couldn't initialize a SAX driver to create an XMLReader at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2251) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2037) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2007) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:83) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748)
Flink s3 wire log
Here is my wire log while trying to checkpoint to ecs S3. I see the request got a 404 , does this mean that it can't find the folder *checkpoints . *Since s3 does not have folders, what should I put there ? Thanks so much for all the help that you guys have provided so far. Really appreciate it. 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkSSLSocket - created: vishwas.test1.SU73ECSG2P1d.VISA.COM/192.168.38.48:9021 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator - Connection established 10.211.120.115:44746<->192.168.38.48:9021 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.DefaultManagedHttpClientConnection - http-outgoing-15: set socket timeout to 20 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec - Executing request HEAD /checkpoints/6a17b04819b5d567afd1376faa174e1d HTTP/1.1 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec - Proxy auth state: UNCHALLENGED 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.headers - http-outgoing-15 >> HEAD /checkpoints/6a17b04819b5d567afd1376faa174e1d HTTP/1.1 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.headers - http-outgoing-15 >> Host: vishwas.test1.SU73ECSG2P1d.VISA.COM:9021 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.headers - http-outgoing-15 >> x-amz-content-sha256: UNSIGNED-PAYLOAD 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.headers - http-outgoing-15 >> Authorization: AWS4-HMAC-SHA256 Credential=vdna_np_user/20190719/us-east-1/s3/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date, Signature=bf5910dae4c760b1ebd80307e51516f26a6fe5e8adfbd5fcb0a3f74da7c0891e 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.headers - http-outgoing-15 >> X-Amz-Date: 20190719T002936Z 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.headers - http-outgoing-15 >> User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271 Linux/3.10.0-957.21.3.el7.x86_64 Java_HotSpot(TM)_64-Bit_Server_VM/25.131-b11 java/1.8.0_131 scala/2.11.12 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.headers - http-outgoing-15 >> amz-sdk-invocation-id: c5d5c46a-ff2d-1ff2-6b7a-f43ab9b78e5d 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.headers - http-outgoing-15 >> amz-sdk-retry: 0/0/500 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.headers - http-outgoing-15 >> Content-Type: application/octet-stream 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.headers - http-outgoing-15 >> Connection: Keep-Alive 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.wire- http-outgoing-15 >> "HEAD /checkpoints/6a17b04819b5d567afd1376faa174e1d HTTP/1.1[\r][\n]" 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.wire- http-outgoing-15 >> "Host: vishwas.test1.SU73ECSG2P1d.VISA.COM:9021[\r][\n]" 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.wire- http-outgoing-15 >> "x-amz-content-sha256: UNSIGNED-PAYLOAD[\r][\n]" 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.wire- http-outgoing-15 >> "Authorization: AWS4-HMAC-SHA256 Credential=vdna_np_user/20190719/us-east-1/s3/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date, Signature=bf5910dae4c760b1ebd80307e51516f26a6fe5e8adfbd5fcb0a3f74da7c0891e[\r][\n]" 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.wire- http-outgoing-15 >> "X-Amz-Date: 20190719T002936Z[\r][\n]" 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.wire- http-outgoing-15 >> "User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271 Linux/3.10.0-957.21.3.el7.x86_64 Java_HotSpot(TM)_64-Bit_Server_VM/25.131-b11 java/1.8.0_131 scala/2.11.12[\r][\n]" 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.wire- http-outgoing-15 >> "amz-sdk-invocation-id: c5d5c46a-ff2d-1ff2-6b7a-f43ab9b78e5d[\r][\n]" 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.wire- http-outgoing-15 >> "amz-sdk-retry: 0/0/500[\r][\n]" 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.wire- http-outgoing-15 >> "Content-Type: application/octet-stream[\r][\n]" 2019-07-19 00:29:36,323 DEBUG org.apache.flink.fs.s3base.shaded.org.apache.http.wire
Re: Providing external files to flink classpath
Does the -yt option work for standalone cluster without dedicated resource manager ? So this property file is read by one of the dependencies inside my application as a file, so I can't really use Parameter tool to parse the config file. Thanks, Vishwas On Fri, Jun 28, 2019 at 11:08 PM Yun Tang wrote: > Hi Vishwas > > >1. You could use '-yt' to ship specified files to the class path, >please refer to [1] for more details. >2. If the properties are only loaded on client side before executing >the application, you could let your application to just read from local >property data. Flink support to load properties within the >ParameterTool [2]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html#usage > [2] > https://github.com/apache/flink/blob/f1721293b0701d584d42bd68671181e332d2ad04/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java#L120 > > Best > Yun Tang > > -- > *From:* Vishwas Siravara > *Sent:* Saturday, June 29, 2019 0:43 > *To:* user > *Subject:* Providing external files to flink classpath > > Hi , > I am trying to add external property files to the flink classpath for > my application. These files are not a part of the fat jar. I put them > under the lib folder but flink cant find them? How can I manage > external property files that needs to be read by flink ? > > Thanks, > Vishwas >
Questions about user doc.
Hey guys, In this document : https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html , there is a line in the beginning of the scheduling section which says that : "A pipeline consists of multiple successive tasks, such as the *n-th* parallel instance of a MapFunction together with the *n-th* parallel instance of a ReduceFunction. Note that Flink often executes successive tasks concurrently:" I am guessing this means that Flink executes successive tasks from different pipelines successively right ? I also don't fully understand Intermediate result partition and Intermediate dataset , why are there two boxes in the diagram for intermediate result after the first execution job vertex ? Is there any more docs I can read to clearly understand these diagrams, thanks for your help. Thanks, Vishwas
Unable to start task manager in debug mode
Hi guys, I am not able to start a stand alone session with one task manager and one job manager on the same node by adding debug option in flink-conf.yaml as env.java.opts: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005( https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters) . This is what my master and slave files look like *cat masters * *localhost:8081* *[was@sl73rspapd031 conf]$ cat slaves* *localhost* The job manager comes up but the task manager does not, from the log file ERROR: transport error 202: bind failed: Address already in use ERROR: JDWP Transport dt_socket failed to initialize, TRANSPORT_INIT(510) JDWP exit error AGENT_ERROR_TRANSPORT_INIT(197): No transports initialized [debugInit.c:750] This is because the job manager binds 5005 to its process and the task manager cannot do this since it is already associated with the job manager, how can I start the task manager ? Should I comment out the debug config from the yaml file after job manager is up and start the task manger separately ? Thanks for your help.
Providing external files to flink classpath
Hi , I am trying to add external property files to the flink classpath for my application. These files are not a part of the fat jar. I put them under the lib folder but flink cant find them? How can I manage external property files that needs to be read by flink ? Thanks, Vishwas
Error checkpointing to S3 like FS (EMC ECS)
Hi, I am using flink version 1.7.2 , I am trying to use S3 like object storage EMC ECS( https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm). Not all S3 apis are supported by EMC ESC according to this document. Here is my config s3.endpoint: SU73ECSG1P1d.***.COM s3.access-key: vdna_np_user security.ssl.rest.enabled: false web.timeout: 1 s3.secret-key: J*** I can access this bucket from s3cmd client. I set the state backend from my scala application env.setStateBackend(new FsStateBackend("s3://aip-featuretoolkit/checkpoints/")) However when I run my application I get this exception : ClientException: Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM: Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM 1336 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:255) 1337 at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498) 1338 at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345) 1339 at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) 1340 at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1173) 1341 at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1153) 1342 at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296) 1343 at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157) 1344 ... 10 more 1345 Caused by: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on aip-featuretoolkit: org.apache.flink.fs.s3base.shaded.co m.amazonaws.SdkClientException: Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM: Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM 1346 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177) 1347 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) 1348 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) 1349 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) 1350 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) 1351 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) 1352 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:372) 1353 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:308) 1354 at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125) 1355 at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395) 1356 at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) 1357 at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) 1358 at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58) 1359 at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444) 1360 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249) 1361 ... 17 more 1362 Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM 1363 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114) 1364 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064) 1365 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) 1366 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) 1367 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) 1368 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) 1369 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) 1370 at
Re: Unable to set S3 like object storage for state backend.
Hi Ken, Thanks for reaching out, I created a compliant bucket with name aip-featuretoolkit. I now get the exception "Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.***.COM: Name or service not known" from org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.class line 56. Here is my config from flink-conf.yaml file. s3.endpoint: SU73ECSG1P1d.***.COM s3.access-key: vdna_np_user security.ssl.rest.enabled: false web.timeout: 1 s3.secret-key: J*** I have not supplied the port in the config file. Does it internally use 9021 ? Also I am running my application as a different user not what is specified in s3.access-key. Does that matter ? Thanks, Vishwas On Thu, Jun 20, 2019 at 5:06 PM Ken Krugler wrote: > > Hi Vishwas, > > It might be that you’ve got a legacy bucket name (“aip_featuretoolkit”), as > AWS no longer allows bucket names to contain an underscore. > > I’m guessing that the Hadoop S3 code is trying to treat your path as a valid > URI, but the bucket name doesn’t conform, and thus you get the "null uri > host” issue. > > Could you try with a compliant bucket name? > > — Ken > > On Jun 20, 2019, at 2:46 PM, Vishwas Siravara wrote: > > Hi, > I am using flink version 1.7.2 , I am trying to use S3 like object > storage EMC ECS( > https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) . > > I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for > s3 filesystem and I have placed it under the lib folder and is > available to flink in its class path. > > My flink-conf.yaml looks like this : > > s3.endpoint: SU73ECSG1P1d.***.COM:9021 > s3.access-key: vdna_np_user > security.ssl.rest.enabled: false > web.timeout: 1 > s3.secret-key: J*** > > And my code for statebackend is like this : > > env.setStateBackend(new > FsStateBackend("s3://aip_featuretoolkit/checkpoints/")) > > I have a bucket called aip_featuretoolkit in my s3 instance. I can > connect to s3 form s3 command line utilities. However I cannot > checkpoint with this configuration in flink. I get the following error > message > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Could not > retrieve JobResult. > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > at com.visa.flink.job.DruidStreamJob$.runJob(DruidStreamJob.scala:62) > at com.visa.flink.cli.CliFlinkDruid.run(CliFlinkDruid.scala:19) > at com.visa.flink.cli.Main$.main(Main.scala:22) > at com.visa.flink.cli.Main.main(Main.scala) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: > Failed to submit job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJ
Unable to set S3 like object storage for state backend.
Hi, I am using flink version 1.7.2 , I am trying to use S3 like object storage EMC ECS( https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) . I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for s3 filesystem and I have placed it under the lib folder and is available to flink in its class path. My flink-conf.yaml looks like this : s3.endpoint: SU73ECSG1P1d.***.COM:9021 s3.access-key: vdna_np_user security.ssl.rest.enabled: false web.timeout: 1 s3.secret-key: J*** And my code for statebackend is like this : env.setStateBackend(new FsStateBackend("s3://aip_featuretoolkit/checkpoints/")) I have a bucket called aip_featuretoolkit in my s3 instance. I can connect to s3 form s3 command line utilities. However I cannot checkpoint with this configuration in flink. I get the following error message Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult. at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at com.visa.flink.job.DruidStreamJob$.runJob(DruidStreamJob.scala:62) at com.visa.flink.cli.CliFlinkDruid.run(CliFlinkDruid.scala:19) at com.visa.flink.cli.Main$.main(Main.scala:22) at com.visa.flink.cli.Main.main(Main.scala) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) ... 4 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176) at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: java.lang.RuntimeException: Failed to start checkpoint ID counter: null uri host. This can be caused by unencoded / in the password string at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:255) at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345) at