Re: OOM error for heap state backend.

2020-08-26 Thread Vishwas Siravara
Thanks Andrey,
My question is related to

The FsStateBackend is encouraged for:

   - Jobs with large state, long windows, large key/value states.
   - All high-availability setups.

How large is large state without any overhead added by the framework?

Best,
Vishwas

On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin 
wrote:

> Hi Vishwas,
>
>  is this quantifiable with respect to JVM heap size on a single node
>> without the node being used for other tasks ?
>
>
> I don't quite understand this question. I believe the recommendation in
> docs has the same reason: use larger state objects so that the Java object
> overhead pays off.
> RocksDB keeps state in memory and on disk in the serialized form.
> Therefore it usually has a smaller footprint.
> Other jobs in the same task manager can potentially use other state
> backend depending on their state requirements.
> All tasks in the same task manager share the JVM heap as the task manager
> runs one JVM system process on the machine where it is deployed to.
>
> Best,
> Andrey
>
> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara 
> wrote:
>
>> Hi Andrey,
>> Thanks for getting back to me so quickly. The screenshots are for 1GB
>> heap, the keys for the state are 20 character strings(20 bytes, we don't
>> have multi byte characters) . So the overhead seems to be quite large(4x)
>> even in comparison to the checkpoint size(which already adds an overhead) .
>> In this document [1] it says use FS/Heap backend for large states, is this
>> quantifiable with respect to JVM heap size on a single node without the
>> node being used for other tasks ?
>> I have attached GC log for TM and JM
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>
>> Best,
>> Vishwas
>>
>> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Vishwas,
>>>
>>> I believe the screenshots are from a heap size of 1GB?
>>>
>>> There are indeed many internal Flink state objects. They are overhead
>>> which is required for Flink to organise and track the state on-heap.
>>> Depending on the actual size of your state objects, the overhead may be
>>> relatively large or compared to the actual state size.
>>> For example, if you just keep integers in your state then overhead is
>>> probably a couple of times larger.
>>> It is not easy to estimate exactly on-heap size without through analysis.
>>>
>>> The checkpoint has little overhead and includes only actual state data -
>>> your serialized state objects which are probably smaller than their heap
>>> representation.
>>>
>>> So my guess is that the heap representation of the state is much bigger
>>> compared to the checkpoint size.
>>>
>>> I also cc other people who might add more thoughts about on-heap state
>>> size.
>>>
>>> You could also provide GC logs as Xintong suggested.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
>>> wrote:
>>>
>>>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
>>>> checkpoint size). I took a heap dump and I could not find any memory leak
>>>> from user code. I see the similar behaviour on smaller heap size, on a 1GB
>>>> heap , the state size from checkpoint UI is 180 MB. Attaching some
>>>> screenshots of heap profiles if it helps. So when the state grows GC takes
>>>> a long time and sometimes the job manager removes TM slot because of
>>>> 1ms timeout and tries to restore the task in another task manager, this
>>>> creates a cascading effect and affects other jobs running on the cluster.
>>>> My tests were run in a single node cluster with 1 TM and 4 task slots with
>>>> a parallelism of 4.
>>>>
>>>> Best,
>>>> Vishwas
>>>>
>>>> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin 
>>>> wrote:
>>>>
>>>>> Hi Vishwas,
>>>>>
>>>>> If you use Flink 1.7, check the older memory model docs [1] because
>>>>> you referred to the new memory model of Flink 1.10 in your reference 2.
>>>>> Could you also share a screenshot where you get the state size of 2.5
>>>>> GB? Do you mean Flink WebUI?
>>>>> Generally, it is quite hard to estimate the on-heap size of state java
>>>>> objects. I never heard about such a Flink metric.

OOM error for heap state backend.

2020-08-21 Thread Vishwas Siravara
Hi guys,
I use flink version 1.7.2
I have a stateful streaming job which uses a keyed process function. I use
heap state backend. Although I set TM heap size to 16 GB, I get OOM error
when the state size is around 2.5 GB(from dashboard I get the state size).
I have set taskmanager.memory.fraction: 0.01 (which I believe is for native
calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors start
showing up when the state size reaches 1 GB. This I find puzzling because I
would expect to get a lot more space on the heap for state when I change
the size to 16 GB, what fraction of the heap is used by the framework ?[2].
Below is the stack trace for the exception. How can I increase my state
size on the heap ?

2020-08-21 02:05:54,443 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Memory
usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
(used/committed/max)]
2020-08-21 02:05:54,444 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Direct
memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
1074692521
2020-08-21 02:05:54,444 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Off-heap
pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace:
70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB
(used/committed/max)]
2020-08-21 02:05:54,444 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Garbage
collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS
MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
2020-08-21 02:05:54,446 INFO  org.apache.flink.runtime.taskmanager.Task
- KeyedProcess (1/4) (23946753549293edc23e88f257980cb4)
switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: Java heap space
at java.lang.reflect.Array.newInstance(Array.java:75)
at java.util.Arrays.copyOf(Arrays.java:3212)
at java.util.Arrays.copyOf(Arrays.java:3181)
at
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83)
at
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown
Source)
at
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)



[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview

Best,
Vishwas


Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Thank you Chesnay.
Yes but I could change the staging directory by adding
-Djava.io.tmpdir=/data/flink-1.7.2/tmp
to *env.java.opts *in the flink-conf.yaml file. Do you see any problem with
that?

Best,
Vishwas

On Thu, Aug 20, 2020 at 2:01 PM Chesnay Schepler  wrote:

> Could you try adding this to your flink-conf.yaml?
>
> s3.staging-directory: /usr/mware/flink/tmp
>
> On 20/08/2020 20:50, Vishwas Siravara wrote:
>
> Hi Piotr,
> I did some analysis and realised that the temp files for s3
> checkpoints are staged in /tmp although the  *io.tmp.dirs *is set to a
> different directory.
>
> ls -lrth
>
> drwxr-xr-x. 2 was  was 32 Aug 20 17:52 hsperfdata_was
> -rw---. 1 was  was   505M Aug 20 18:45 
> presto-s3-8158855975833379228.tmp
> -rw---. 1 was  was   505M Aug 20 18:45 
> presto-s3-7048419193714606532.tmp
> drwxr-xr--. 2 root root 6 Aug 20 18:46 hsperfdata_root
> [was@sl73rspapd031 tmp]$
>
> flink-conf.yaml configuration
>
> io.tmp.dirs: /usr/mware/flink/tmp
>
> The /tmp has only 2GB, is it possible to change the staging directory for s3 
> checkpoints ?
>
> Best,
>
> Vishwas
>
>
> On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara 
> wrote:
>
>> Hi Piotr,
>> Thank you for your suggestion. I will try that, are the temporary files
>> created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ?
>> Would these files be the same size as checkpoints ?
>>
>>
>> Thanks,
>> Vishwas
>>
>> On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> As far as I know when uploading a file to S3, the writer needs to first
>>> create some temporary files on the local disks. I would suggest to double
>>> check all of the partitions on the local machine and monitor available disk
>>> space continuously while the job is running. If you are just checking the
>>> free space manually, you can easily miss a point of time when you those
>>> temporary files are too big and approaching the available disk space usage,
>>> as I'm pretty sure those temporary files are cleaned up immediately after
>>> throwing this exception that you see.
>>>
>>> Piotrek
>>>
>>> czw., 20 sie 2020 o 00:56 Vishwas Siravara 
>>> napisał(a):
>>>
>>>> Hi guys,
>>>> I have a deduplication job that runs on flink 1.7, that has some state
>>>> which uses FsState backend. My TM heap size is 16 GB. I see the below error
>>>> while trying to checkpoint a state of size 2GB. There is enough space
>>>> available in s3, I tried to upload larger files and they were all
>>>> successful. There is also enough disk space in the local file system, the
>>>> disk utility tool does not show anything suspicious. Whenever I try to
>>>> start my job from the last successful checkpoint , it runs into the same
>>>> error. Can someone tell me what is the cause of this issue? Many thanks.
>>>>
>>>>
>>>> Note: This error goes away when I delete io.tmp.dirs and restart the
>>>> job from last checkpoint , but the disk utility tool does not show much
>>>> usage before deletion, so I am not able to figure out what the problem is.
>>>>
>>>> 2020-08-19 21:12:01,909 WARN
>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
>>>> not close the state stream for s3p://featuretoolkit.c
>>>> heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
>>>> 1363 java.io.IOException: No space left on device
>>>> 1364 at java.io.FileOutputStream.writeBytes(Native Method)
>>>> 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326)
>>>> 1366 at
>>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>>>> 1367 at
>>>> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>>>> 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>>>> 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>>>> 1370 at
>>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
>>>> 1371 at
>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>>> 1372 at
>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Hi Piotr,
I did some analysis and realised that the temp files for s3 checkpoints are
staged in /tmp although the  *io.tmp.dirs *is set to a different directory.

ls -lrth

drwxr-xr-x. 2 was  was 32 Aug 20 17:52 hsperfdata_was
-rw---. 1 was  was   505M Aug 20 18:45 presto-s3-8158855975833379228.tmp
-rw---. 1 was  was   505M Aug 20 18:45 presto-s3-7048419193714606532.tmp
drwxr-xr--. 2 root root 6 Aug 20 18:46 hsperfdata_root
[was@sl73rspapd031 tmp]$

flink-conf.yaml configuration

io.tmp.dirs: /usr/mware/flink/tmp


The /tmp has only 2GB, is it possible to change the staging directory
for s3 checkpoints ?


Best,

Vishwas


On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara 
wrote:

> Hi Piotr,
> Thank you for your suggestion. I will try that, are the temporary files
> created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ?
> Would these files be the same size as checkpoints ?
>
>
> Thanks,
> Vishwas
>
> On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> As far as I know when uploading a file to S3, the writer needs to first
>> create some temporary files on the local disks. I would suggest to double
>> check all of the partitions on the local machine and monitor available disk
>> space continuously while the job is running. If you are just checking the
>> free space manually, you can easily miss a point of time when you those
>> temporary files are too big and approaching the available disk space usage,
>> as I'm pretty sure those temporary files are cleaned up immediately after
>> throwing this exception that you see.
>>
>> Piotrek
>>
>> czw., 20 sie 2020 o 00:56 Vishwas Siravara 
>> napisał(a):
>>
>>> Hi guys,
>>> I have a deduplication job that runs on flink 1.7, that has some state
>>> which uses FsState backend. My TM heap size is 16 GB. I see the below error
>>> while trying to checkpoint a state of size 2GB. There is enough space
>>> available in s3, I tried to upload larger files and they were all
>>> successful. There is also enough disk space in the local file system, the
>>> disk utility tool does not show anything suspicious. Whenever I try to
>>> start my job from the last successful checkpoint , it runs into the same
>>> error. Can someone tell me what is the cause of this issue? Many thanks.
>>>
>>>
>>> Note: This error goes away when I delete io.tmp.dirs and restart the
>>> job from last checkpoint , but the disk utility tool does not show much
>>> usage before deletion, so I am not able to figure out what the problem is.
>>>
>>> 2020-08-19 21:12:01,909 WARN
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
>>> not close the state stream for s3p://featuretoolkit.c
>>> heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
>>> 1363 java.io.IOException: No space left on device
>>> 1364 at java.io.FileOutputStream.writeBytes(Native Method)
>>> 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326)
>>> 1366 at
>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>>> 1367 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>>> 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>>> 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>>> 1370 at
>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
>>> 1371 at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>> 1372 at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>>> 1373 at
>>> org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>>> 1374 at
>>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>>> 1375 at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269)
>>> 1376 at
>>> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58)
>>> 1377 at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
>>> 1378 at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250)
>>> 1379 at
>>> org.apache.fli

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
Hi Piotr,
Thank you for your suggestion. I will try that, are the temporary files
created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ?
Would these files be the same size as checkpoints ?


Thanks,
Vishwas

On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski  wrote:

> Hi,
>
> As far as I know when uploading a file to S3, the writer needs to first
> create some temporary files on the local disks. I would suggest to double
> check all of the partitions on the local machine and monitor available disk
> space continuously while the job is running. If you are just checking the
> free space manually, you can easily miss a point of time when you those
> temporary files are too big and approaching the available disk space usage,
> as I'm pretty sure those temporary files are cleaned up immediately after
> throwing this exception that you see.
>
> Piotrek
>
> czw., 20 sie 2020 o 00:56 Vishwas Siravara 
> napisał(a):
>
>> Hi guys,
>> I have a deduplication job that runs on flink 1.7, that has some state
>> which uses FsState backend. My TM heap size is 16 GB. I see the below error
>> while trying to checkpoint a state of size 2GB. There is enough space
>> available in s3, I tried to upload larger files and they were all
>> successful. There is also enough disk space in the local file system, the
>> disk utility tool does not show anything suspicious. Whenever I try to
>> start my job from the last successful checkpoint , it runs into the same
>> error. Can someone tell me what is the cause of this issue? Many thanks.
>>
>>
>> Note: This error goes away when I delete io.tmp.dirs and restart the job
>> from last checkpoint , but the disk utility tool does not show much usage
>> before deletion, so I am not able to figure out what the problem is.
>>
>> 2020-08-19 21:12:01,909 WARN
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
>> not close the state stream for s3p://featuretoolkit.c
>> heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
>> 1363 java.io.IOException: No space left on device
>> 1364 at java.io.FileOutputStream.writeBytes(Native Method)
>> 1365 at java.io.FileOutputStream.write(FileOutputStream.java:326)
>> 1366 at
>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>> 1367 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>> 1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>> 1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>> 1370 at
>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
>> 1371 at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> 1372 at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>> 1373 at
>> org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> 1374 at
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>> 1375 at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269)
>> 1376 at
>> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58)
>> 1377 at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
>> 1378 at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250)
>> 1379 at
>> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
>> 1380 at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:185)
>> 1381 at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:84)
>> 1382 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> 1383 at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>> 1384 at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>> 1385 at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>> 1386 at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> 1387 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> 1388 at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> 1389 at

No space left on device exception

2020-08-19 Thread Vishwas Siravara
Hi guys,
I have a deduplication job that runs on flink 1.7, that has some state
which uses FsState backend. My TM heap size is 16 GB. I see the below error
while trying to checkpoint a state of size 2GB. There is enough space
available in s3, I tried to upload larger files and they were all
successful. There is also enough disk space in the local file system, the
disk utility tool does not show anything suspicious. Whenever I try to
start my job from the last successful checkpoint , it runs into the same
error. Can someone tell me what is the cause of this issue? Many thanks.


Note: This error goes away when I delete io.tmp.dirs and restart the job
from last checkpoint , but the disk utility tool does not show much usage
before deletion, so I am not able to figure out what the problem is.

2020-08-19 21:12:01,909 WARN
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
not close the state stream for s3p://featuretoolkit.c
heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
1363 java.io.IOException: No space left on device
1364 at java.io.FileOutputStream.writeBytes(Native Method)
1365 at java.io.FileOutputStream.write(FileOutputStream.java:326)
1366 at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
1367 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
1368 at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
1369 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
1370 at
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
1371 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
1372 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
1373 at
org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
1374 at
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
1375 at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269)
1376 at
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58)
1377 at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
1378 at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250)
1379 at
org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
1380 at
org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:185)
1381 at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:84)
1382 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
1383 at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
1384 at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
1385 at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
1386 at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
1387 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
1388 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
1389 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
1390 at java.lang.Thread.run(Thread.java:748)
1391 Suppressed: java.io.IOException: No space left on device
1392 at java.io.FileOutputStream.writeBytes(Native Method)
1393 at java.io.FileOutputStream.write(FileOutputStream.java:326)
1394 at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
1395 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
1396 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
1397 at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
1398 ... 21 more


Thanks,
Vishwas


Re: Non parallel file sources

2020-06-23 Thread Vishwas Siravara
Thanks that makes sense.

On Tue, Jun 23, 2020 at 2:13 PM Laurent Exsteens <
laurent.exste...@euranova.eu> wrote:

> Hi Nick,
>
> On a project I worked on, we simply made the file accessible on a shared
> NFS drive.
> Our source was custom, and we forced it to parallelism 1 inside the job,
> so the file wouldn't be read multiple times. The rest of the job was
> distributed.
> This was also on a standalone cluster. On a resource managed cluster I
> guess the resource manager could take care of copying the file for us.
>
> Hope this can help. If there would have been a better solution, I'm also
> happy to hear it :).
>
> Regards,
>
> Laurent.
>
>
> On Tue, Jun 23, 2020, 20:51 Nick Bendtner  wrote:
>
>> Hi guys,
>> What is the best way to process a file from a unix file system since
>> there is no guarantee as to which task manager will be assigned to process
>> the file. We run flink in standalone mode. We currently follow the brute
>> force way in which we copy the file to every task manager, is there a
>> better way to do this ?
>>
>>
>> Best,
>> Nick.
>>
>
> ♻ Be green, keep it on the screen


Re: Providing hdfs name node IP for streaming file sink

2020-03-03 Thread Vishwas Siravara
Thanks Yang. Going with setting the HADOOP_CONF_DIR in the flink
application. It integrates neatly with flink.

Best,
Nick.

On Mon, Mar 2, 2020 at 7:42 PM Yang Wang  wrote:

> It may work. However, you need to set your own retry policy(similar as
> `ConfiguredFailoverProxyProvider` in hadoop).
> Also if you directly use namenode address and do not load HDFS
> configuration, some HDFS client configuration (e.g.
> dfs.client.*) will not take effect.
>
>
> Best,
> Yang
>
> Nick Bendtner  于2020年3月2日周一 下午11:58写道:
>
>> Thanks a lot Yang. What are your thoughts on catching the exception when
>> a name node is down and retrying with the secondary name node ?
>>
>> Best,
>> Nick.
>>
>> On Sun, Mar 1, 2020 at 9:05 PM Yang Wang  wrote:
>>
>>> Hi Nick,
>>>
>>> Certainly you could directly use "namenode:port" as the schema of you
>>> HDFS path.
>>> Then the hadoop configs(e.g. core-site.xml, hdfs-site.xml) will not be
>>> necessary.
>>> However, that also means you could benefit from the HDFS
>>> high-availability[1].
>>>
>>> If your HDFS cluster is HA configured, i strongly suggest you to set the
>>> "HADOOP_CONF_DIR"
>>> for your Flink application. Both the client and cluster(JM/TM) side need
>>> to be set. Then
>>> your HDFS path could be specified like this "hdfs://myhdfs/flink/test".
>>> Given that "myhdfs"
>>> is the name service configured in hdfs-site.xml.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>>
>>> [1].
>>> http://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
>>>
>>> Nick Bendtner  于2020年2月29日周六 上午6:00写道:
>>>
 To add to this question, do I need to setup env.hadoop.conf.dir to
 point to the hadoop config for instance env.hadoop.conf.dir=/etc/hadoop/
 for the jvm ? Or is it possible to write to hdfs without any external
 hadoop config like core-site.xml, hdfs-site.xml ?

 Best,
 Nick.



 On Fri, Feb 28, 2020 at 12:56 PM Nick Bendtner 
 wrote:

> Hi guys,
> I am trying to write to hdfs from streaming file sink. Where should I
> provide the IP address of the name node ? Can I provide it as a part of 
> the
> flink-config.yaml file or should I provide it like this :
>
> final StreamingFileSink sink = StreamingFileSink
>   .forBulkFormat(hdfs://namenode:8020/flink/test, 
> ParquetAvroWriters.forGenericRecord(schema))
>
>   .build();
>
>
> Best,
> Nick
>
>
>


Re: Exactly once semantics for hdfs sink

2020-02-11 Thread Vishwas Siravara
Hi Khachatryan,
Thanks for your reply. Can you help me understand how it works with hdfs
specifically , even a link to a document will help.


Best,
Vishwas

On Mon, Feb 10, 2020 at 10:32 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Vishwas,
>
> Yes, Streaming File Sink does support exactly-once semantics and can be
> used with HDFS.
>
> Regards,
> Roman
>
>
> On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara 
> wrote:
>
>> Hi all,
>> I want to use the StreamingFile sink for writing data to hdfs. Can I
>> achieve exactly once semantics with this sink ?
>>
>>
>> Best,
>> HW.
>>
>


Exactly once semantics for hdfs sink

2020-02-10 Thread Vishwas Siravara
Hi all,
I want to use the StreamingFile sink for writing data to hdfs. Can I
achieve exactly once semantics with this sink ?


Best,
HW.


Unit testing filter function in flink

2019-12-19 Thread Vishwas Siravara
Hi guys,
I want to test a function like :

private[flink] def filterStream(dataStream:
DataStream[GenericRecord]): DataStream[GenericRecord] = {
  dataStream.filter(new FilterFunction[GenericRecord] {
override def filter(value: GenericRecord): Boolean = {
  if (value == null || value.get(StipFields.requestMessageType) == null) {
return false;
  } else {

ExecutionEnv.messageTypeList.contains(value.get(StipFields.requestMessageType)
  .toString) &&
ExecutionEnv.pcrList.contains(value.get(StipFields.pcrList).toString)
&& (value.get(StipFields
  .rejectCode).asInstanceOf[Int] == 0) &&
!(value.get(StipFields.processingCode).toString.equals("33"))
  }
}
  })
}

How can I do this ?

Best,
Vishwas


flink's hard dependency on zookeeper for HA

2019-11-06 Thread Vishwas Siravara
Hi all,
I am using flink 1.7.2 as a standalone cluster in high availability mode
with zookeeper. I have noticed that all flink processes go down once
zookeeper goes down ? Is this expected behavior since the leader election
has already happened and the job has been running for several hours.


Best,
Vishwas


Partitioning based on key flink kafka sink

2019-11-05 Thread Vishwas Siravara
Hi all,
I am using flink 1.7.0 and using this constructor

FlinkKafkaProducer(String topicId, KeyedSerializationSchema
serializationSchema, Properties producerConfig)

>From the doc it says this constructor uses fixed partitioner. I want to
partition based on key , so I tried to use this

public FlinkKafkaProducer(
   String defaultTopicId,
   KeyedSerializationSchema serializationSchema,
   Properties producerConfig,
   Optional> customPartitioner)

What should I pass in the optional field ? From the doc it says

@param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
*  If a partitioner is not provided, records
will be partitioned by the key of each record
*  (determined by {@link
KeyedSerializationSchema#serializeKey(Object)}). If the keys
*  are {@code null}, then records will be
distributed to Kafka partitions in a
*  round-robin fashion.

This is super confusing(contradicting in a way) since the previous
constructor says that fixedpartitioner will be used if customPartioner is
not present.

Best,
Vishwas


Re ordering events with flink

2019-11-01 Thread Vishwas Siravara
Hi guys,
I want to know if it's possible to sort events in a flink data stream. I
know I can't sort a stream but is there a way in which I can buffer for a
very short time and sort those events before sending it to a data sink.

In our scenario we consume from a kafka topic which has multiple partitions
but the data in these brokers are *not* partitioned by a key(its round
robin) , for example we want to time order transactions associated with a
particular account but since the same account number ends up in
different partitions at the source for different transactions we are not
able to maintain event time order in our stream processing system since the
same account number ends up in different task managers and slots. We do
however partition by account number when we send the events to downstream
kafka sink so that transactions from the same account number end up in the
same partition. This is however not good enough since the events are not
sorted at the source.

Any ideas for doing this is much appreciated.


Best,
Vishwas


Re: Consumer group and flink

2019-10-16 Thread Vishwas Siravara
Please ignore this email.

On Wed, Oct 16, 2019 at 1:40 PM Vishwas Siravara 
wrote:

> Hi guys,
> Is it necessary to specify a consumer group name for a kafka streaming job
> when checkpointing is enabled? Since the offsets are not stored in kafka
> how does specifying a consumer group help ?
>
> Best,
> Vishwas
>


Consumer group and flink

2019-10-16 Thread Vishwas Siravara
Hi guys,
Is it necessary to specify a consumer group name for a kafka streaming job
when checkpointing is enabled? Since the offsets are not stored in kafka
how does specifying a consumer group help ?

Best,
Vishwas


Re: Flink restoring a job from a checkpoint

2019-10-09 Thread Vishwas Siravara
Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my
consumer group does not change ? I start from the group offsets :
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka
source")
So when I restart the job it should consume from the last committed offset
to kafka isn't it ? Let me know what you think .

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu  wrote:

> Hi Vishwas
>
> Currently, Flink can only restore retained checkpoint or savepoint with
> parameter `-s`[1][2], otherwise, it will start from scratch.
>
> ```
> checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs]
> savepoint --> bin/flink run -s :savepointPath [:runArgs]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints
>
> Best,
> Congxian
>
>
> Vishwas Siravara  于2019年10月9日周三 上午5:07写道:
>
>> Hi Yun,
>> Thanks for your reply. I do start from GROUP_OFFSET . Here is the code
>> snippet :
>>
>> env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
>> source")
>>
>> I have also enabled and externalized checkpointing to S3 .
>> Why is it not recommended to just restart the job once I cancel it, as
>> long as the topology does not change? What is the advantage of
>> explicitly restoring from last checkpoint by passing the -s option to the
>> flink command line if it does the same thing? For instance if 
>> s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
>> is my last successful checkpoint, what is the difference between 1 and 2.
>>
>> 1. /usr/mware/flink/bin/flink run -d -C 
>> file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
>> flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
>> 2. /usr/mware/flink/bin/flink run -s 
>> s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
>>  -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
>> flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
>>
>> Thanks,
>> Vishwas
>>
>> On Tue, Oct 8, 2019 at 1:51 PM Yun Tang  wrote:
>>
>>> Hi Vishwas
>>>
>>> If you did not configure your
>>> org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is
>>> GROUP_OFFSET by default, which means "Start from committed offsets in ZK /
>>> Kafka brokers of a specific consumer group". And you need  to enable
>>> checkpoint so that kafka offsets are committed when checkpoint completes.
>>>
>>> In other words, even if you don't resume from checkpoint, just enable
>>> checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could
>>> restore from last committed offset if previous checkpoint completed [1][2].
>>> However, this is not really recommended, better to resume from last
>>> checkpoint [3]
>>>
>>> [1]
>>> https://www.slideshare.net/robertmetzger1/clickthrough-example-for-flinks-kafkaconsumer-checkpointing
>>> [2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>>>
>>>
>>> Best
>>> Yun Tang
>>>
>>>
>>> --
>>> *From:* Vishwas Siravara 
>>> *Sent:* Wednesday, October 9, 2019 0:54
>>> *To:* user 
>>> *Subject:* Flink restoring a job from a checkpoint
>>>
>>> Hi guys,
>>> I have a flink streaming job which streams from a kafka source. There is
>>> no state in the job, just a simple filter , map and write to a kafka sink.
>>> Suppose I stop my job and then submit the job again to the cluster with the
>>> same consumer group, will the job restore automatically from the last
>>> successful checkpoint , since this is what is the last committed offset to
>>> kafka ?
>>>
>>> Thanks,
>>> Vishwas
>>>
>>


Flink restoring a job from a checkpoint

2019-10-08 Thread Vishwas Siravara
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no
state in the job, just a simple filter , map and write to a kafka sink.
Suppose I stop my job and then submit the job again to the cluster with the
same consumer group, will the job restore automatically from the last
successful checkpoint , since this is what is the last committed offset to
kafka ?

Thanks,
Vishwas


Increasing number of task slots in the task manager

2019-10-01 Thread Vishwas Siravara
Hi guys,
I get java heap space error when I have 1 GB of TM memory and 4 slots(we
have 4 cores in our lower environment) per TM , each slot has 1/4GB of
managed memory.
>From the flink doc
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
I
see that slots are allocated memory statically . When I change the TM
memory to 8GB , my job works fine without any heap issues with 4 slots. So
here each slot gets around 2GB of heap . In another environment we have 60
cores. Does it make sense for me to have 60 slots in the task manager for
8GB of TM heap ? I assume that I will get heap space error since each slot
will have 8/60 GB of memory . Is my assumption correct ?

Thanks,
Vishwas


High availability flink job

2019-09-15 Thread Vishwas Siravara
Hi guys,
I have a flink job running in standalone mode with a parallelism of >1 ,
that produces data to a kafka sink. My topic is replicated with a
replication factor of 2. Now suppose one of the kafka brokers goes down ,
then will my streaming job fail ? Is there a way where in I can continue
processing until that broker node comes up, also the sink is partitioned by
a key.

Thanks,
Vishwas


Flink kafka producer partitioning scheme

2019-09-13 Thread Vishwas Siravara
Hi guys,
>From the flink doc
*By default, if a custom partitioner is not specified for the Flink Kafka
Producer, the producer will use a FlinkFixedPartitioner that maps each
Flink Kafka Producer parallel subtask to a single Kafka partition (i.e.,
all records received by a sink subtask will end up in the same Kafka
partition).*

Does this mean that if my downstream topic has 40 partitions , I will need
40 parallel subtasks ?

Thanks,
Vishwas


externalizing config flies for flink class loader

2019-09-12 Thread Vishwas Siravara
I have a standalone cluster. I have added my own library(jar file) to the
lib/ folder in flink . I submit my job from cli after I start the cluster.
Now I want to externalize a property file which has to be read by this
library. Since this library is loaded by flink's classloader and not the
application class loader I cannot supply this using flink run -C ... since
this works only for user class loader.


Thanks,
Vishwas


Using FlinkKafkaConsumer API

2019-09-09 Thread Vishwas Siravara
I am using flink-kafka-connector and this is my dependency

"org.apache.flink" %% "flink-connector-kafka" % *"1.7.0"*,


Whe I look at my dependency tree the kafka client version is

 -org.apache.kafka:kafka-clients:2.0.1 which comes from the above package.


However when I run my code in the cluster I see that the kafka-client that
is loaded is

 0.10.2.0


Here is the task executor log :

2019-09-09 03:05:56,825 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
version : 0.10.2.0

I am struggling to find out where this dependency is coming from. Our
broker version is not

compatible with this client. How can I force flink to use 2.0.1.


Also the API I use for Kafka Consumer is

 private[flink] def sourceType: FlinkKafkaConsumer[GenericRecord] = {
val consumer = new FlinkKafkaConsumer[GenericRecord](
  source.asJava,
  AvroDeserialization.genericRecd,
  ExecutionEnv.streamProperties)
consumer
  }

}


I really appreciate help. Is there any way I can find out where this
dependency comes from in the cluster as this is clearly not coming
form my application.



Thanks,

Vishwas


Re: understanding task manager logs

2019-09-06 Thread Vishwas Siravara
Thanks, I'll check it out.

On Thu, Sep 5, 2019 at 5:22 AM Fabian Hueske  wrote:

> Hi Vishwas,
>
> This is a log statement from Kafka [1].
> Not sure how when AppInfoParser is created (the log message is written by
> the constructor).
>
> For Kafka versions > 1.0, I'd recommend the universal connector [2].
>
> Not sure how well it works if producers and consumers have different
> versions.
> Maybe Gordon (in CC) has some experience with that.
>
> Best, Fabian
>
> [1]
> https://github.com/axbaretto/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java#L117
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-100-connector
>
> Am Di., 3. Sept. 2019 um 04:04 Uhr schrieb Vishwas Siravara <
> vsirav...@gmail.com>:
>
>> Hi guys,
>> I am using flink 1.7.2 and my application consumes from a kafka topic and
>> publish to another kafka topic which is in its own kafka environment
>> running a different kafka version,. I am using FlinkKafkaConsumer010 from
>> this dependency
>> *"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion. *
>>
>> In the task manager log I see these lines:
>>
>> 2019-09-02 02:57:59,840 INFO  
>> org.apache.kafka.common.security.authenticator.AbstractLogin  - Successfully 
>> logged in.
>> 2019-09-02 02:57:59,841 INFO  
>> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
>> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh 
>> thread started.
>> 2019-09-02 02:57:59,842 INFO  
>> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
>> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid 
>> starting at: Mon Sep 02 02:57:59 GMT 2019
>> 2019-09-02 02:57:59,843 INFO  
>> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
>> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT expires: Mon 
>> Sep 02 12:57:59 GMT 2019
>> 2019-09-02 02:57:59,843 INFO  
>> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
>> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh 
>> sleeping until: Mon Sep 02 11:14:13 GMT 2019
>> 2019-09-02 02:57:59,919 WARN  
>> org.apache.kafka.clients.consumer.ConsumerConfig  - The 
>> configuration 'zookeeper.connect' was supplied but isn't a known 
>> config.*2019-09-02 02:57:59,919 INFO  
>> org.apache.kafka.common.utils.AppInfoParser   - Kafka 
>> version : 0.10.2.0
>> *2019-09-02 02:57:59,919 INFO  org.apache.kafka.common.utils.AppInfoParser   
>> - Kafka commitId : 576d93a8dc0cf421
>>
>> Here if you see the Kafka version is 0.10.2.0. Is this the version the 
>> broker is running or is this coming from flink ? I have forced the 
>> kafka-client version
>>
>> to be 2.2.0
>>
>> "org.apache.kafka" % "kafka-clients" % "2.2.0" force()
>>
>> I also don't see 0.10.2.0 in the dependency tree of my build.
>>
>> Also will flink-connector-kafka-0.10 work for kafka versions > 1.0 ? What 
>> should I do if the consumer broker and producer broker are on different 
>> versions of kafka ?
>>
>>
>> Thanks,
>>
>> Vishwas
>>
>>
>> Thanks,
>>
>> Vishwas
>>
>>
>>
>>


Flink Kafka Connector

2019-09-05 Thread Vishwas Siravara
Hi guys,
I am using flink connector for kakfa from 1.9.0
Her is my sbt dependency :


"org.apache.flink" %% "flink-connector-kafka" % "1.9.0",


When I check the log file I see that the kafka version is 0.10.2.0.
According to the docs it says that 1.9.0 onwards the version should be
2.2.0. Why do I see this


2019-09-06 01:41:43,534 INFO
org.apache.kafka.common.utils.AppInfoParser - Kafka version :
0.10.2.0. This

creates a big problem, I can connect to the broker but I don't see any
messages.


Why is this ?


Thanks,

Vishwas


understanding task manager logs

2019-09-02 Thread Vishwas Siravara
Hi guys,
I am using flink 1.7.2 and my application consumes from a kafka topic and
publish to another kafka topic which is in its own kafka environment
running a different kafka version,. I am using FlinkKafkaConsumer010 from
this dependency
*"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion. *

In the task manager log I see these lines:

2019-09-02 02:57:59,840 INFO
org.apache.kafka.common.security.authenticator.AbstractLogin  -
Successfully logged in.
2019-09-02 02:57:59,841 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh
thread started.
2019-09-02 02:57:59,842 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid
starting at: Mon Sep 02 02:57:59 GMT 2019
2019-09-02 02:57:59,843 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT
expires: Mon Sep 02 12:57:59 GMT 2019
2019-09-02 02:57:59,843 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh
sleeping until: Mon Sep 02 11:14:13 GMT 2019
2019-09-02 02:57:59,919 WARN
org.apache.kafka.clients.consumer.ConsumerConfig  - The
configuration 'zookeeper.connect' was supplied but isn't a known
config.*2019-09-02 02:57:59,919 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
version : 0.10.2.0
*2019-09-02 02:57:59,919 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
commitId : 576d93a8dc0cf421

Here if you see the Kafka version is 0.10.2.0. Is this the version the
broker is running or is this coming from flink ? I have forced the
kafka-client version

to be 2.2.0

"org.apache.kafka" % "kafka-clients" % "2.2.0" force()

I also don't see 0.10.2.0 in the dependency tree of my build.

Also will flink-connector-kafka-0.10 work for kafka versions > 1.0 ?
What should I do if the consumer broker and producer broker are on
different versions of kafka ?


Thanks,

Vishwas


Thanks,

Vishwas


Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
Thanks, I'll check it out.

On Thu, Aug 29, 2019 at 1:08 PM David Morin 
wrote:

> Vishwas,
>
> A config that works on my Kerberized cluster (Flink on Yarn).
> I hope this will help you.
>
> Flink conf:
> security.kerberos.login.use-ticket-cache: true
> security.kerberos.login.keytab: /home/myuser/myuser.keytab
> security.kerberos.login.principal: myuser@
> security.kerberos.login.contexts: Client
>
> Properties related to security passed as argument of the
> FlinkKafkaConsumerXX constructor:
> sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule
> required username=\"myuser\" password=\"\";"
> sasl.mechanism=PLAIN
> security.protocol=SASL_SSL
>
> Le jeu. 29 août 2019 à 18:20, Vishwas Siravara  a
> écrit :
>
>> Hey David ,
>> My consumers are registered , here is the debug log. The problem is the
>> broker does not belong to me , so I can’t see what is going on there . But
>> this is a new consumer group , so there is no state yet .
>>
>>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
>> Consumer subtask 0 will start reading the following 40 partitions from the 
>> committed group offsets in Kafka: 
>> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]
>>
>> On Thu, Aug 29, 2019 at 11:39 AM David Morin 
>> wrote:
>>
>>> Hello Vishwas,
>>>
>>> You can use a keytab if you prefer. You generate a keytab for your user
>>> and then you can reference it in the Flink configuration.
>>> Then this keytab will be handled by Flink in a secure way and TGT will
>>> be created based on this keytab.
>>> However, that seems to be working.
>>> Did you check Kafka logs on the broker side ?
>>> Or did you check consumer offsets with Kafka tools in order to validate
>>> consumers are registered onto the different partitions of your topic ?
>>> You could try to switch to a different groupid for your consumer gro

Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
Hey David ,
My consumers are registered , here is the debug log. The problem is the
broker does not belong to me , so I can’t see what is going on there . But
this is a new consumer group , so there is no state yet .

 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
Consumer subtask 0 will start reading the following 40 partitions from
the committed group offsets in Kafka:
[KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]

On Thu, Aug 29, 2019 at 11:39 AM David Morin 
wrote:

> Hello Vishwas,
>
> You can use a keytab if you prefer. You generate a keytab for your user
> and then you can reference it in the Flink configuration.
> Then this keytab will be handled by Flink in a secure way and TGT will be
> created based on this keytab.
> However, that seems to be working.
> Did you check Kafka logs on the broker side ?
> Or did you check consumer offsets with Kafka tools in order to validate
> consumers are registered onto the different partitions of your topic ?
> You could try to switch to a different groupid for your consumer group in
> order to force parallel consumption.
>
> Le jeu. 29 août 2019 à 09:57, Vishwas Siravara  a
> écrit :
>
>> I see this log as well , but I can't see any messages . I know for a fact
>> that the topic I am subscribed to has messages as I checked with a simple
>> java consumer with a different group.
>>
>>
>>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
>> Consumer subtask 0 will start reading the following 40 partitions from the 
>> committed group offsets in Kafka: 
>> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=

Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
I see this log as well , but I can't see any messages . I know for a fact
that the topic I am subscribed to has messages as I checked with a simple
java consumer with a different group.


 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
Consumer subtask 0 will start reading the following 40 partitions from
the committed group offsets in Kafka:
[KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]


On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara 
wrote:

> Hi guys,
> I am using kerberos for my kafka source. I pass the jaas config and
> krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf
> -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/
> -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf
> -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf
>
> When I look at debug logs I see that the consumer was created with the
> following properties.
>
> 2019-08-29 06:49:18,298 INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig  - 
> ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = latest
> bootstrap.servers = [sl73oprdbd018.visa.com:9092]
> check.crcs = true
> client.id = consumer-2
> connections.max.idle.ms = 54
> enable.auto.commit = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
>
>
> group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 500
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.ms = 50
> request.timeout.ms = 305000
> retry.backoff.ms = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> 

Flink and kerberos

2019-08-29 Thread Vishwas Siravara
Hi guys,
I am using kerberos for my kafka source. I pass the jaas config and
krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf
-Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/
-Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf
-Djava.security.krb5.conf=/home/was/Jaas/krb5.conf

When I look at debug logs I see that the consumer was created with the
following properties.

2019-08-29 06:49:18,298 INFO
org.apache.kafka.clients.consumer.ConsumerConfig  -
ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [sl73oprdbd018.visa.com:9092]
check.crcs = true
client.id = consumer-2
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1


group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 1
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer


I can also see that the kerberos login is working fine. Here is the log for it:



2019-08-29 06:49:18,312 INFO
org.apache.kafka.common.security.authenticator.AbstractLogin  -
Successfully logged in.
2019-08-29 06:49:18,313 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh
thread started.
2019-08-29 06:49:18,314 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid
starting at: Thu Aug 29 06:49:18 GMT 2019
2019-08-29 06:49:18,314 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT
expires: Thu Aug 29 16:49:18 GMT 2019
2019-08-29 06:49:18,315 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh
sleeping until: Thu Aug 29 15:00:10 GMT 2019
2019-08-29 06:49:18,316 WARN
org.apache.kafka.clients.consumer.ConsumerConfig  - The
configuration 'zookeeper.connect' was supplied but isn't a known
config.
2019-08-29 06:49:18,316 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
version : 0.10.2.0
2019-08-29 06:49:18,316 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
commitId : 576d93a8dc0cf421


I then see this log :

INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633
rack: null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)



*The problem is I do not see any error log but there is no data being
processed by the consmer and it has been a nightmare to debug. *


Thanks for all the help .


Thanks,Vishwas


Re: Loading dylibs

2019-08-28 Thread Vishwas Siravara
Yes this is exactly what happens , as a work around I created a small jar
file which has code to load the dylib and I placed it under the lib folder
, this library is in provided scope in my actual job, so the dylib gets
loaded only once when the tm/jm jvm starts .
What I found interesting in my old approach was even when I check whether
the dylib has already been loaded in the current thread , and if it is I
still get the unsatisfied link error even though that dylib is loaded in
the task manager .

On Wed, Aug 28, 2019 at 7:04 AM Aleksey Pak  wrote:

> Hi Vishwas,
>
> There is a known issue in the Flink Jira project [1].
> Is it possible that you have encountered the same problem?
>
> [1]: https://issues.apache.org/jira/browse/FLINK-11402
>
> Regards,
> Aleksey
>
>
> On Tue, Aug 27, 2019 at 8:03 AM Vishwas Siravara 
> wrote:
>
>> Hi Jörn,
>> I tried that. Here is my snippet :
>>
>> String[] loadedlibs =  
>> getLoadedLibraries(Thread.currentThread().getContextClassLoader());
>> if(!containsVibeSimpleLib(loadedlibs)) {
>> System.loadLibrary("vibesimplejava");
>> }
>>
>> Now I get the exception Unexpected errorjava.lang.UnsatisfiedLinkError:
>> com.voltage.securedata.enterprise.ConstantsNative.DIGEST_MD5()I which means
>> that it could not find vibesimplejava in the loaded libs but I know that
>> the if was not executed because vibesimplejava was present in loadedlibs(
>> the control never went inside the if block. Any other suggestions?
>>
>> Thanks,
>> Vishwas
>>
>>
>>
>>
>>
>>
>> On Tue, Aug 27, 2019 at 12:25 AM Jörn Franke 
>> wrote:
>>
>>> I don’t know Dylibs in detail, but can you call a static method where it
>>> checks if it has been already executed and if not then it loads the library
>>> (Singleton pattern)?
>>>
>>> Am 27.08.2019 um 06:39 schrieb Vishwas Siravara :
>>>
>>> Hi guys,
>>> I have a flink application that loads a dylib like this
>>>
>>> System.loadLibrary("vibesimplejava");
>>>
>>>
>>> The application runs fine , when I restart the job I get this exception
>>> :
>>>
>>> com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected 
>>> errorjava.lang.UnsatisfiedLinkError: Native Library 
>>> /usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/libvibesimplejava.so
>>>  already loaded in another classloader
>>>
>>> This happens because the dylib has already been loaded once by the
>>> taskmanger, how can I mitigate this? It seems problematic if two
>>> applications are loading the same dylib.
>>>
>>> Thanks,
>>> Vishwas
>>>
>>>


Re: Loading dylibs

2019-08-27 Thread Vishwas Siravara
Hi Jörn,
I tried that. Here is my snippet :

String[] loadedlibs =
getLoadedLibraries(Thread.currentThread().getContextClassLoader());
if(!containsVibeSimpleLib(loadedlibs)) {
System.loadLibrary("vibesimplejava");
}

Now I get the exception Unexpected errorjava.lang.UnsatisfiedLinkError:
com.voltage.securedata.enterprise.ConstantsNative.DIGEST_MD5()I which means
that it could not find vibesimplejava in the loaded libs but I know that
the if was not executed because vibesimplejava was present in loadedlibs(
the control never went inside the if block. Any other suggestions?

Thanks,
Vishwas






On Tue, Aug 27, 2019 at 12:25 AM Jörn Franke  wrote:

> I don’t know Dylibs in detail, but can you call a static method where it
> checks if it has been already executed and if not then it loads the library
> (Singleton pattern)?
>
> Am 27.08.2019 um 06:39 schrieb Vishwas Siravara :
>
> Hi guys,
> I have a flink application that loads a dylib like this
>
> System.loadLibrary("vibesimplejava");
>
>
> The application runs fine , when I restart the job I get this exception :
>
> com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected 
> errorjava.lang.UnsatisfiedLinkError: Native Library 
> /usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/libvibesimplejava.so
>  already loaded in another classloader
>
> This happens because the dylib has already been loaded once by the
> taskmanger, how can I mitigate this? It seems problematic if two
> applications are loading the same dylib.
>
> Thanks,
> Vishwas
>
>


Loading dylibs

2019-08-26 Thread Vishwas Siravara
Hi guys,
I have a flink application that loads a dylib like this

System.loadLibrary("vibesimplejava");


The application runs fine , when I restart the job I get this exception :

com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected
errorjava.lang.UnsatisfiedLinkError: Native Library
/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/libvibesimplejava.so
already loaded in another classloader

This happens because the dylib has already been loaded once by the
taskmanger, how can I mitigate this? It seems problematic if two
applications are loading the same dylib.

Thanks,
Vishwas


Re: Using shell environment variables

2019-08-25 Thread Vishwas Siravara
You can also link at runtime by providing the path to the dylib by adding
-Djava.library.path= in jvm options in the task manager

On Sat, Aug 24, 2019 at 9:11 PM Zhu Zhu  wrote:

> Hi Abhishek,
>
> You need to export the environment variables on all the worker
> machines(not the machine to submit the job).
>
> Alternatively, if you are submitting the job to a yarn cluster, you can
> use flink conf prefix "containerized.taskmanager.env." to add environment
> variables to Flink's task manager process.
> For example for passing LD_LIBRARY_PATH as an env variable to the workers,
> set: containerized.taskmanager.env.LD_LIBRARY_PATH: "/usr/lib/native" in
> the flink-conf.yaml.
>
> Thanks,
> Zhu Zhu
>
> Abhishek Jain  于2019年8月25日周日 上午2:48写道:
>
>> Hi Miki,
>> Thanks for your reply. ParameterTool will only help in making the value
>> accessible through ParameterTool.get(). However, I need a way of accessing
>> the value using "System.getenv" since the underlying library uses it so.
>>
>> On Sat, 24 Aug 2019 at 23:04, miki haiat  wrote:
>>
>>> Did you register your system environment parameter ?
>>>
>>> You can find here several ways to use configuration data [1]
>>>
>>> 1.
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/best_practices.html
>>>
>>>
>>> On Sat, Aug 24, 2019, 20:26 Abhishek Jain  wrote:
>>>
 Hi!

 I am using a library that depends on a certain environment variable set
 (mandatorily). Now, I've exported this variable in my environment but
 somehow it's not being read by the task manager. Following is the exception
 I get when I try to run the job:

 Caused by: com.example.MyCustomException: Env token is null
 at com.example.AerospikeSink.open(AerospikeSink.java:47)
 at
 org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at
 org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 at java.lang.Thread.run(Thread.java:745)

 Here's the code that throws this exception:

 @Override
 public void open(Configuration config) throws Exception {
 if (System.getenv("API_TOKEN") == null) {
 throw new MyCustomException("Env token is null");
 }
 }

 My question: Is there an alternative to System.getenv() that I can use
 to access environment variables inside of flink task?

 ( P.S. I've only copied relevant code snippet to avoid confusion. I do
 intend to use API_TOKEN later on. )

 --
 Warm Regards,
 Abhishek Jain

>>>
>>
>> --
>> Warm Regards,
>> Abhishek Jain
>>
>


Re: Use logback instead of log4j

2019-08-25 Thread Vishwas Siravara
Any idea on how I can use log back instead ?

On Fri, Aug 23, 2019 at 1:22 PM Vishwas Siravara 
wrote:

> Hi ,
> From the flink doc , in order to use logback instead of log4j " Users
> willing to use logback instead of log4j can just exclude log4j (or delete
> it from the lib/ folder)."
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html
>  .
>
> However when i delete it from the lib and start the cluster , there are no
> logs generated , instead I see console log which says "Failed to
> instantiate SLF4J LoggerFactory"
>
> Reported exception:
> java.lang.NoClassDefFoundError: org/apache/log4j/Level
> at org.slf4j.LoggerFactory.bind(LoggerFactory.java:143)
> at 
> org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:122)
> at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:378)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:328)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349)
> at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.(ClusterEntrypoint.java:98)
>
>
> How can I use logback instead ?
>
>
> Thanks,
> Vishwas
>
>


Re: Externalized checkpoints

2019-08-25 Thread Vishwas Siravara
Got it.Thank you

On Thu, Aug 22, 2019 at 8:54 PM Congxian Qiu  wrote:

> Hi, Vishwas
>
> As Zhu Zhu said, you can set "state.checkpoints.num-retained"[1] to
> specify the maximum number of completed checkpoints to retain.
> maybe you can also ref the external checkpoint cleanup type[2] config for
> how to clean up the retained checkpoint[2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.html#related-config-options
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#retained-checkpoints
> Best,
> Congxian
>
>
> Zhu Zhu  于2019年8月22日周四 上午10:13写道:
>
>> Hi Vishwas,
>>
>> You can configure "state.checkpoints.num-retained" to specify the max
>> checkpoints to retain.
>> By default it is 1.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Vishwas Siravara  于2019年8月22日周四 上午6:48写道:
>>
>>> I am also using exactly once checkpointing mode, I have a kafka source
>>> and sink so both support transactions which should allow for exactly once
>>> processing. Is this the reason why there is only one checkpoint retained ?
>>>
>>> Thanks,
>>> Vishwas
>>>
>>> On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara 
>>> wrote:
>>>
>>>> Hi peeps,
>>>> I am externalizing checkpoints in S3 for my flink job and I retain them
>>>> on cancellation. However when I look into my S3 bucket where the
>>>> checkpoints are stored there is only 1 checkpoint at any point in time . Is
>>>> this the default behavior of flink where older checkpoints are deleted when
>>>> the current checkpoint completes ? Here are a few screenshots. What are
>>>> your thoughts on restoring an older state which is not the previous state ?
>>>>
>>>> List contents of bucket at time 0
>>>>
>>>> Object Name: 
>>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last
>>>>  modified time : Wed Aug 21 22:17:23 GMT 2019
>>>> Object Name: 
>>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified 
>>>> time : Wed Aug 21 22:17:24 GMT 2019
>>>>
>>>> List contents of bucket at time 1
>>>>
>>>> Printing last modified times
>>>> Object Name: 
>>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last
>>>>  modified time : Wed Aug 21 22:23:24 GMT 2019
>>>> Object Name: 
>>>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified 
>>>> time : Wed Aug 21 22:23:24 GMT 2019
>>>>
>>>> Thanks,
>>>>
>>>> Vishwas
>>>>
>>>>


Use logback instead of log4j

2019-08-23 Thread Vishwas Siravara
Hi ,
>From the flink doc , in order to use logback instead of log4j " Users
willing to use logback instead of log4j can just exclude log4j (or delete
it from the lib/ folder)."
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html
 .

However when i delete it from the lib and start the cluster , there are no
logs generated , instead I see console log which says "Failed to
instantiate SLF4J LoggerFactory"

Reported exception:
java.lang.NoClassDefFoundError: org/apache/log4j/Level
at org.slf4j.LoggerFactory.bind(LoggerFactory.java:143)
at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:122)
at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:378)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:328)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.(ClusterEntrypoint.java:98)


How can I use logback instead ?


Thanks,
Vishwas


Re: Externalized checkpoints

2019-08-21 Thread Vishwas Siravara
I am also using exactly once checkpointing mode, I have a kafka source and
sink so both support transactions which should allow for exactly once
processing. Is this the reason why there is only one checkpoint retained ?

Thanks,
Vishwas

On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara 
wrote:

> Hi peeps,
> I am externalizing checkpoints in S3 for my flink job and I retain them on
> cancellation. However when I look into my S3 bucket where the checkpoints
> are stored there is only 1 checkpoint at any point in time . Is this the
> default behavior of flink where older checkpoints are deleted when the
> current checkpoint completes ? Here are a few screenshots. What are your
> thoughts on restoring an older state which is not the previous state ?
>
> List contents of bucket at time 0
>
> Object Name: 
> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last
>  modified time : Wed Aug 21 22:17:23 GMT 2019
> Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast 
> modified time : Wed Aug 21 22:17:24 GMT 2019
>
> List contents of bucket at time 1
>
> Printing last modified times
> Object Name: 
> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last
>  modified time : Wed Aug 21 22:23:24 GMT 2019
> Object Name: 
> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified 
> time : Wed Aug 21 22:23:24 GMT 2019
>
> Thanks,
>
> Vishwas
>
>


Externalized checkpoints

2019-08-21 Thread Vishwas Siravara
Hi peeps,
I am externalizing checkpoints in S3 for my flink job and I retain them on
cancellation. However when I look into my S3 bucket where the checkpoints
are stored there is only 1 checkpoint at any point in time . Is this the
default behavior of flink where older checkpoints are deleted when the
current checkpoint completes ? Here are a few screenshots. What are your
thoughts on restoring an older state which is not the previous state ?

List contents of bucket at time 0

Object Name: 
checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last
modified time : Wed Aug 21 22:17:23 GMT 2019
Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast
modified time : Wed Aug 21 22:17:24 GMT 2019

List contents of bucket at time 1

Printing last modified times
Object Name: 
checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last
modified time : Wed Aug 21 22:23:24 GMT 2019
Object Name: checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast
modified time : Wed Aug 21 22:23:24 GMT 2019

Thanks,

Vishwas


Flink logback

2019-08-21 Thread Vishwas Siravara
Hi all,
I modified the logback.xml provided by flink distribution, so now the
logback.xml file looks like this :







































































































*${log.file}
  false
%d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60}
%X{sourceThread} - %msg%n

  true

INFOACCEPT
DENY
/var/mware/logs/APPLOGS/service.log
/var/mware/logs/APPLOGS/Archive/service.%d{-MM-dd_HH}.log
  .gz
%d{-MM-dd_HH:mm:ss.SSS} %p %c |
%m%nUTF-8

0
5


ERRORACCEPT
DENY
/var/mware/logs/APPLOGS/service-error.log


/var/mware/logs/APPLOGS/Archive/service-error.%d{-MM-dd_HH}.log.gz
  
%d{-MM-dd_HH:mm:ss.SSS} %p %c %m%n
UTF-8
  0
5




  


*


I have two file appenders defined. How ever my application log ends up
going to the taskexecutor log file ? How can I fix this issue ?


Thanks,
Vishwas


Configuring logback

2019-08-20 Thread Vishwas Siravara
Hi guys,
I am using logback for my application logs. I have logback.xml as a part of
my fat jar that I submit to flink via command line flink run "...". When I
run my application from IDE , the appenders are what I have set in my
logback but when I run from command line the appender defaults to the root
in the flink installation directory. How can I make sure that my
application logs go to the correct appender. Here is my logback.xml file
which is available in the classpath.






true








INFO
ACCEPT
DENY

${APP_LOG_ROOT}service.log


${APP_LOG_ROOT}Archive/service.%d{-MM-dd_HH}.log.gz


%d{-MM-dd_HH:mm:ss.SSS} %p %c | %m%n
UTF-8



0
5










ERROR
ACCEPT
DENY

${APP_LOG_ROOT}service-error.log


${APP_LOG_ROOT}Archive/service-error.%d{-MM-dd_HH}.log.gz


%d{-MM-dd_HH:mm:ss.SSS} %p %c %m%n
UTF-8



0
5











%d{-MM-dd_HH:mm:ss.SSS} [%thread] %-5level %logger{5} -
%m%n
UTF-8










Thanks,
Vishwas


Re: Configuring logback for my flink job

2019-08-20 Thread Vishwas Siravara
My logback is a part of my application jar file, do you mean I should
externalize it and pass set its location in FLINK_CONF_DIR ?
like export FLINK_CONF_DIR=/home/was/

where I have logback.xml in /home/was/ ?

Thanks,
Vishwas

On Mon, Aug 19, 2019 at 10:31 PM Yang Wang  wrote:

> Hi Vishwas,
>
> If you mean to have your application logs to its configured appenders in
> client, i think you could use your own FLINK_CONF_DIR environment.
> Otherwise, we could not update the log4j/logback configuration.[1]
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-dist/src/main/flink-bin/bin/flink#L52
>
> Vishwas Siravara  于2019年8月19日周一 下午11:02写道:
>
>> Hi,
>> I have a logback for my flink application which is packaged with the
>> application fat jar. However when I submit my job from flink command line
>> tool, I see that logback is set to
>> -Dlogback.configurationFile=file:/data/flink-1.7.2/conf/logback.xml from
>> the client log.
>>
>> As a result my application log ends up going to the client log. How can I
>> change this behavior . I know a dirty fix is to add my logback config to
>> the logback in /data/flink-1.7.2/conf/logback.xml .
>>
>> Any other suggestions on how I can have my application log to its
>> configured appenders.
>>
>> Thanks,
>> Vishwas
>>
>


Configuring logback for my flink job

2019-08-19 Thread Vishwas Siravara
Hi,
I have a logback for my flink application which is packaged with the
application fat jar. However when I submit my job from flink command line
tool, I see that logback is set to
-Dlogback.configurationFile=file:/data/flink-1.7.2/conf/logback.xml from
the client log.

As a result my application log ends up going to the client log. How can I
change this behavior . I know a dirty fix is to add my logback config to
the logback in /data/flink-1.7.2/conf/logback.xml .

Any other suggestions on how I can have my application log to its
configured appenders.

Thanks,
Vishwas


Re: Understanding job flow

2019-08-16 Thread Vishwas Siravara
I did not find this to be true. Here is my code snippet.

object DruidStreamJob extends Job with SinkFn {

  private[flink] val druidConfig = DruidConfig.current

  private[flink] val decryptMap = ExecutionEnv.loadDecryptionDictionary

  //  //TODO: Add this to sbt jvm, this should be set in sbt fork jvm.
This is hack.
  //  System.setProperty("java.library.path",
"/Users/vsiravar/workspace/aipcryptoclient/lib")
  //
  //  import java.lang.reflect.Field
  //
  //  val fieldSysPath: Field =
classOf[ClassLoader].getDeclaredField("sys_paths")
  //  fieldSysPath.setAccessible(true)
  //  fieldSysPath.set(null, null)
  //
  //  print(System.getProperty("java.library.path"))

  private[flink] val aipSimpleAPIEncryptor = new AipCryptoClient(
ExecutionEnv.mockEncryption,
ExecutionEnv.enableEncryption,
ExecutionEnv
  .loadEncryptionSet)

  aipSimpleAPIEncryptor.init("aip_crypto_config.properties")

  val appLogger: Logger = LoggerFactory.getLogger(DruidStreamJob.getClass)

  val errorLogger: Logger = LoggerFactory.getLogger("streaming.error")

*  private[flink] val sdsClient = SDSEncryptor(decryptMap,
ExecutionEnv.mockDecryption)*

  sdsClient.init()

  /**
   * Start streaming job execution .
   *
   * @param argMap
   */
  private[flink] def runJob(argMap: Map[String, String]): Unit = {

val env = ExecutionEnv.executionEnv(argMap)
this.source = ExecutionEnv.sourceTopics

env.enableCheckpointing(1000)
env.setStateBackend(new FsStateBackend("s3://vishwas.test1/checkpoints"))
sourceAndSinkFn(env, source)
env.execute(jobName = name)
  }

  /**
   * @inheritdoc
   * @param env
   * @param topics
   */
  override private[flink] def sourceAndSinkFn(
env: StreamExecutionEnvironment,
topics: List[String]) = {
val dataStream = addSource(env)
log.info("Subscribed to topics" + topics)

val filteredStream = dataStream.filter(new FilterFunction[GenericRecord] {

  override def filter(value: GenericRecord): Boolean = {

ExecutionEnv.messageTypeList.contains(value.get("CMLS_REQST_MSG_TYP").toString)
& ExecutionEnv
  .pcrList.contains(value.get("CMLS_DEST_PCR").toString)
  }
})

val result = filteredStream.map(record =>
encryptWithAipCryptoClient(addTimeStamp(sdsClient
  .decrypt(applyValues(record)

result.print()
KafkaSink(result).sendToKafka
  }

  private[flink] def encryptWithAipCryptoClient(maptoEncrypt:
mutable.Map[String, Any]): mutable.Map[String, Any] = {
aipSimpleAPIEncryptor.encrypt(maptoEncrypt.asInstanceOf[mutable.Map[String,
AnyRef]].asJava)
maptoEncrypt
  }

  private[flink] def applyValues(
genericRecord: GenericRecord): mutable.Map[String, Any] = {

collection.mutable.Map(genericRecord.getSchema.getFields.asScala
  .map(field =>
field.schema().getType match {
  case Schema.Type.LONG =>
field.name() -> genericRecord.get(field.name()).asInstanceOf[Long]
  case Schema.Type.INT =>
field.name() -> genericRecord.get(field.name()).asInstanceOf[Int]
  case Schema.Type.DOUBLE =>
field.name() -> genericRecord.get(field.name()).asInstanceOf[Double]
  case Schema.Type.STRING =>
field.name() -> genericRecord.get(field.name()).toString
  case _ =>
field.name() -> genericRecord.get(field.name()).toString
}): _*)

  }

  private[flink] def addTimeStamp(payload: mutable.Map[String, Any]):
mutable.Map[String, Any] = {
try {
  if (!payload("CMLS_CPD_ORIG_DT").equals("19000101")) {
return payload + ("timestamp" ->
TimeUtility.convertDateStringToLong(payload("CMLS_CPD_ORIG_DT").asInstanceOf[String],
payload("CMLS_AUTH_TIME").asInstanceOf[Int]));
  }
  return payload + ("timestamp" -> System.currentTimeMillis())
} catch {
  case e: Exception => {
errorLogger.error("Unable to obtain epoch time, using
currentSystem time" + e.printStackTrace())
return payload + ("timestamp" -> System.currentTimeMillis())
  }
}
  }

}


The code for initialization of sds client(font is green for that piece of
code) is in the main thread, even before the job graph is created. However
when I run this code on a cluster with 3 task managers on different nodes,
it is initialized each time on all the 3 nodes(taskmanager). I wonder why
this happens.

Thanks,
Vishwas

On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson 
wrote:

> @transient or use a static factory.
>
> In Scala we use a @transient lazy val with an initializer to do this
>
> Sent from my iPhone
>
> On Aug 15, 2019, at 11:40 AM, Vishwas Siravara 
> wrote:
>
> Thanks Steven. Is there a way where in I can create a singleton instance
> in each task 

Understanding job flow

2019-08-15 Thread Vishwas Siravara
Hi guys,
I have a map job where I want to encrypt certain keys . I initialize the
encryptor in the main method and apply it in the map function. How is this
encryptor shared when I have my job running on multiple task managers with
parallelism > 1 ?

Thanks,
Vishwas


Flink job parallelism

2019-08-15 Thread Vishwas Siravara
Hi guys,
I have a flink job which I want to run with a parallelism of 2.

I run it from command line like : flink run -p 2 -C
file:///home/was/classpathconfig/ -c com.visa.flink.cli.Main
flink-job-assembly-0.1-SNAPSHOT.jar flink druid

My cluster has two task managers with only 1 task slot each.
However when I look at the Web UI for my job , I see that one of the task
managers is still available. But when I submit with the web UI , both the
task managers are used for this job and I get a parallelism of 2.

Can you help me with understanding as to why this happens ?

Thank you
Vishwas


Re: How can I pass multiple java options in standalone mode ?

2019-08-15 Thread Vishwas Siravara
No this does not work , the config in flink-config.yaml applies only to job
manager and task manager for env.java.options. When I say "flink run ..."
from command line, there is a new jvm which is created which starts the
main method and that does not get the env.java.options which is set in the
flink-conf.yaml. This works when I set JVM_ARGS and export it before "flink
run ..."
Here is a similar link from stackoverflow :
https://stackoverflow.com/questions/42344624/apache-flink-custom-java-options-are-not-recognized-inside-job

Thanks,
Vishwas

On Thu, Aug 15, 2019 at 1:20 AM Yang Wang  wrote:

> Hi Vishwas
>
> The java option is used to start jobmanager and taskmanager jvm process.
> It should take effect when you set is in the flink-conf.yaml on all nodes.
> It is a flink cluster level configuration, not a job level. So i'm not
> sure what do you want to do.
>
>
> Best,
> Yang
>
> Vishwas Siravara  于2019年8月15日周四 上午3:39写道:
>
>> Thanks a lot, I fixed that, so now this works when I submit my job with
>> the flink UI but when I submit it via flink run(command line) it does not
>> take this env.java.opts: -Dconfig.resource=qa.conf property . How can I
>> pass the jvm property to flink run which is running standalone without
>> resource manager.
>>
>> Thanks,
>> Vishwas
>>
>> On Wed, Aug 14, 2019 at 2:35 PM Aleksandar Mastilovic <
>> amastilo...@sightmachine.com> wrote:
>>
>>> It’s a YAML file, so I think you need to do something like
>>>
>>>  env.java.opts: -Dconfig.resource=qa.conf
>>>
>>> On Aug 14, 2019, at 11:58 AM, Vishwas Siravara 
>>> wrote:
>>>
>>> When I add env.java.opts like this
>>>  env.java.opts:"-Dconfig.resource=qa.conf"
>>>
>>> I see an error in the log file saying :  - Error while trying to split
>>> key and value in configuration file
>>> /data/flink-1.7.2/conf/flink-conf.yaml:248:
>>> "env.java.opts:"-Dconfig.resource=qa.conf""
>>>
>>> This is really confusing and I cant find any document on how I should
>>> pass this option.
>>>
>>> Thanks,
>>> Vishwas
>>>
>>> On Wed, Aug 14, 2019 at 12:40 PM Vishwas Siravara 
>>> wrote:
>>>
>>>> Is it possible for me to pass these arguments along with the job when I
>>>> do flink run and then pass the jvm options. For example if I want to pass
>>>> this parameter -Dconfig.resource=qa.conf and qa.conf is packaged in the
>>>> job's fat jar then flink will not find this file if I pass
>>>> -Dconfig.resource=qa.conf and qa.conf in env.java.opts.
>>>>
>>>> Thanks,
>>>> Vishwas
>>>>
>>>> On Mon, Aug 12, 2019 at 6:00 PM Zili Chen  wrote:
>>>>
>>>>> Hi Vishwas,
>>>>>
>>>>> Replace ',' with ' '(space) should work.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>>
>>>>> Vishwas Siravara  于2019年8月13日周二 上午6:50写道:
>>>>>
>>>>>> Hi guys,
>>>>>> I have this entry in flink-conf.yaml file for jvm options.
>>>>>> env.java.opts: "-Djava.security.auth.login.config={{
>>>>>> flink_installed_dir }}/kafka-jaas.conf,-Djava.security.krb5.conf={{
>>>>>> flink_installed_dir }}/krb5.conf"
>>>>>>
>>>>>> Is this supposed to be a , separated list ? I get a parse exception
>>>>>> when the cluster starts.
>>>>>>
>>>>>> Thanks,
>>>>>> Vishwas
>>>>>>
>>>>>
>>>


External classpath

2019-08-14 Thread Vishwas Siravara
Hi guys,
I m very close to deploying my application in production so I am trying to
externalize some of the config files which has to be available on the
classpath when I run my application via flink command line interface.

>From the flink doc I can add to class path by

-C,--classpath  Adds a URL to each user code
  classloader  on all nodes in the
  cluster. The paths must specify a
  protocol (e.g. file://) and be
  accessible on all nodes (e.g. by means
  of a NFS share). You can use this
  option multiple times for specifying
  more than one URL. The protocol must
  be supported by the {@link
  java.net.URLClassLoader}.

So my job has 8 task managers( 8 different nodes) with 8 slots in each .
Does my external class path have to be on NFS share ? Can I not have the
config directory on each machine in the same location ? For instance on
Node 1 the config files are in  the directory is
/home/was/classpathconfig/  and
the same on every node. Does it have to be on an NFS ?
My command looks like this flink run -C file://home/was/classpathconfig/ -c
com.visa.flink.cli.Main flink-job-assembly-0.1-SNAPSHOT.jar flink druid

Also I tried to put my files in s3 and tried to run flink run -C s3://
flink.dev/config -c com.visa.flink.cli.Main
flink-job-assembly-0.1-SNAPSHOT.jar flink druid

Bad syntax for classpath: s3://flink.dev/config


s3 does support URLClassLoader but I get the error saying bad syntax.


Please let me know your thoughts. Thanks a lot to this community , I
was able to write my code in a week.


Thanks,

Vishwas


How can I pass jvm options to flink when started from command line

2019-08-14 Thread Vishwas Siravara
I understand that when I run a flink job from command line it forks a jvm
and runs the main method and the flink related code run in the task
manager. So when I say "flink run " the main does not run on JobManager
hence it does not take env.java.options set in the flink-conf.yaml as this
applies to the job manager and task manager. Now how can I pass jvm options
like -Dconfig.resource=qa.conf from command line ?

Thanks,
Vishwas


Re: How can I pass multiple java options in standalone mode ?

2019-08-14 Thread Vishwas Siravara
Thanks a lot, I fixed that, so now this works when I submit my job with the
flink UI but when I submit it via flink run(command line) it does not take
this env.java.opts: -Dconfig.resource=qa.conf property . How can I pass the
jvm property to flink run which is running standalone without resource
manager.

Thanks,
Vishwas

On Wed, Aug 14, 2019 at 2:35 PM Aleksandar Mastilovic <
amastilo...@sightmachine.com> wrote:

> It’s a YAML file, so I think you need to do something like
>
>  env.java.opts: -Dconfig.resource=qa.conf
>
> On Aug 14, 2019, at 11:58 AM, Vishwas Siravara 
> wrote:
>
> When I add env.java.opts like this
>  env.java.opts:"-Dconfig.resource=qa.conf"
>
> I see an error in the log file saying :  - Error while trying to split
> key and value in configuration file
> /data/flink-1.7.2/conf/flink-conf.yaml:248:
> "env.java.opts:"-Dconfig.resource=qa.conf""
>
> This is really confusing and I cant find any document on how I should pass
> this option.
>
> Thanks,
> Vishwas
>
> On Wed, Aug 14, 2019 at 12:40 PM Vishwas Siravara 
> wrote:
>
>> Is it possible for me to pass these arguments along with the job when I
>> do flink run and then pass the jvm options. For example if I want to pass
>> this parameter -Dconfig.resource=qa.conf and qa.conf is packaged in the
>> job's fat jar then flink will not find this file if I pass
>> -Dconfig.resource=qa.conf and qa.conf in env.java.opts.
>>
>> Thanks,
>> Vishwas
>>
>> On Mon, Aug 12, 2019 at 6:00 PM Zili Chen  wrote:
>>
>>> Hi Vishwas,
>>>
>>> Replace ',' with ' '(space) should work.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Vishwas Siravara  于2019年8月13日周二 上午6:50写道:
>>>
>>>> Hi guys,
>>>> I have this entry in flink-conf.yaml file for jvm options.
>>>> env.java.opts: "-Djava.security.auth.login.config={{
>>>> flink_installed_dir }}/kafka-jaas.conf,-Djava.security.krb5.conf={{
>>>> flink_installed_dir }}/krb5.conf"
>>>>
>>>> Is this supposed to be a , separated list ? I get a parse exception
>>>> when the cluster starts.
>>>>
>>>> Thanks,
>>>> Vishwas
>>>>
>>>
>


How can I pass multiple java options in standalone mode ?

2019-08-12 Thread Vishwas Siravara
Hi guys,
I have this entry in flink-conf.yaml file for jvm options.
env.java.opts: "-Djava.security.auth.login.config={{ flink_installed_dir
}}/kafka-jaas.conf,-Djava.security.krb5.conf={{ flink_installed_dir
}}/krb5.conf"

Is this supposed to be a , separated list ? I get a parse exception when
the cluster starts.

Thanks,
Vishwas


Passing jvm options to flink

2019-08-07 Thread Vishwas Siravara
Hi ,
I am running flink on a standalone cluster without any resource manager
like yarn or K8s. I am submitting my job using command line "*f**link run
..." . *I have a couple of questions:

*1.  *How can I pass JVM parameters to this job. I want to pass a parameter
for a dylib like this
-Djava.library.path=/Users/vsiravar/temp/apollo/aipcryptoclient/lib .

2. If I want to add an external directory on my server to my classpath ,
how can I do that ?

Thanks for all the help. I am truly grateful for all the help I have got
building my flink app.

Thanks,
Vishwas


Streaming from a file

2019-08-01 Thread Vishwas Siravara
Hi guys,
Is it possible for flink to stream from a unix file system line by line,
when I use readTextFile(path) - Reads text files, i.e. files that respect
the TextInputFormat specification, line-by-line and returns them as Strings.
The entire contents of the file comes as a datastream, over which I can
iterate to extract each line as a string. Is it possible to stream a line
at a time ? Thanks for all the help so far.


Thanks,
Vishwas


Re: S3 checkpointing exception

2019-07-20 Thread Vishwas Siravara
I found the solution to this problem , it was a dependency issue, I had to
exclude "xml-apis" to get this fixed. Also the s3-presto jar provides
better error messages which was helpful.

Thanks,
Vishwas

On Thu, Jul 18, 2019 at 8:14 PM Vishwas Siravara 
wrote:

> I am using ecs S3 instance to checkpoint, I use the following
> configuration.
>
>
> s3.access-key vdna_np_user
> s3.endpoint https://SU73ECSG**COM:9021
> s3.secret-key **I set the checkpoint in the code like
>  env.setStateBackend(*new *
> FsStateBackend("s3://vishwas.test1/checkpoints"))
>
> I have a bucket called vishwas.test1, should I first create a directory in
> s3 called checkpoints first ?
>
> I see this error in the log , what does this mean ? Thank you so much for
> your help.
>
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSClientIOException:
> getFileStatus on
> s3://vishwas.test1/checkpoint/35abe5cadda5ff77fd3347a956b6f1e2:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
> Couldn't initialize a SAX driver to create an XMLReader: Couldn't
> initialize a SAX driver to create an XMLReader
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2251)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2037)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2007)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:83)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
>


S3 checkpointing exception

2019-07-18 Thread Vishwas Siravara
I am using ecs S3 instance to checkpoint, I use the following
configuration.


s3.access-key vdna_np_user
s3.endpoint https://SU73ECSG**COM:9021
s3.secret-key **I set the checkpoint in the code like
 env.setStateBackend(*new *FsStateBackend("s3://vishwas.test1/checkpoints"))

I have a bucket called vishwas.test1, should I first create a directory in
s3 called checkpoints first ?

I see this error in the log , what does this mean ? Thank you so much for
your help.

org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSClientIOException:
getFileStatus on
s3://vishwas.test1/checkpoint/35abe5cadda5ff77fd3347a956b6f1e2:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Couldn't initialize a SAX driver to create an XMLReader: Couldn't
initialize a SAX driver to create an XMLReader
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2251)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2037)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2007)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326)
at
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:83)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)


Flink s3 wire log

2019-07-18 Thread Vishwas Siravara
Here is my wire log while trying to checkpoint to ecs S3. I see the request
got a 404 , does this mean that it can't find the folder *checkpoints . *Since
s3 does not have folders, what should I put there ? Thanks so much for all
the help that you guys have provided so far. Really appreciate it.

2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkSSLSocket
- created: vishwas.test1.SU73ECSG2P1d.VISA.COM/192.168.38.48:9021
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator
 - Connection established 10.211.120.115:44746<->192.168.38.48:9021
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.DefaultManagedHttpClientConnection
 - http-outgoing-15: set socket timeout to 20
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec
 - Executing request HEAD
/checkpoints/6a17b04819b5d567afd1376faa174e1d HTTP/1.1
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec
 - Proxy auth state: UNCHALLENGED
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.headers -
http-outgoing-15 >> HEAD /checkpoints/6a17b04819b5d567afd1376faa174e1d
HTTP/1.1
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.headers -
http-outgoing-15 >> Host: vishwas.test1.SU73ECSG2P1d.VISA.COM:9021
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.headers -
http-outgoing-15 >> x-amz-content-sha256: UNSIGNED-PAYLOAD
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.headers -
http-outgoing-15 >> Authorization: AWS4-HMAC-SHA256
Credential=vdna_np_user/20190719/us-east-1/s3/aws4_request,
SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date,
Signature=bf5910dae4c760b1ebd80307e51516f26a6fe5e8adfbd5fcb0a3f74da7c0891e
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.headers -
http-outgoing-15 >> X-Amz-Date: 20190719T002936Z
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.headers -
http-outgoing-15 >> User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271
Linux/3.10.0-957.21.3.el7.x86_64
Java_HotSpot(TM)_64-Bit_Server_VM/25.131-b11 java/1.8.0_131
scala/2.11.12
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.headers -
http-outgoing-15 >> amz-sdk-invocation-id:
c5d5c46a-ff2d-1ff2-6b7a-f43ab9b78e5d
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.headers -
http-outgoing-15 >> amz-sdk-retry: 0/0/500
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.headers -
http-outgoing-15 >> Content-Type: application/octet-stream
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.headers -
http-outgoing-15 >> Connection: Keep-Alive
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.wire-
http-outgoing-15 >> "HEAD
/checkpoints/6a17b04819b5d567afd1376faa174e1d HTTP/1.1[\r][\n]"
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.wire-
http-outgoing-15 >> "Host:
vishwas.test1.SU73ECSG2P1d.VISA.COM:9021[\r][\n]"
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.wire-
http-outgoing-15 >> "x-amz-content-sha256: UNSIGNED-PAYLOAD[\r][\n]"
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.wire-
http-outgoing-15 >> "Authorization: AWS4-HMAC-SHA256
Credential=vdna_np_user/20190719/us-east-1/s3/aws4_request,
SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date,
Signature=bf5910dae4c760b1ebd80307e51516f26a6fe5e8adfbd5fcb0a3f74da7c0891e[\r][\n]"
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.wire-
http-outgoing-15 >> "X-Amz-Date: 20190719T002936Z[\r][\n]"
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.wire-
http-outgoing-15 >> "User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271
Linux/3.10.0-957.21.3.el7.x86_64
Java_HotSpot(TM)_64-Bit_Server_VM/25.131-b11 java/1.8.0_131
scala/2.11.12[\r][\n]"
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.wire-
http-outgoing-15 >> "amz-sdk-invocation-id:
c5d5c46a-ff2d-1ff2-6b7a-f43ab9b78e5d[\r][\n]"
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.wire-
http-outgoing-15 >> "amz-sdk-retry: 0/0/500[\r][\n]"
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.wire-
http-outgoing-15 >> "Content-Type: application/octet-stream[\r][\n]"
2019-07-19 00:29:36,323 DEBUG
org.apache.flink.fs.s3base.shaded.org.apache.http.wire  

Re: Providing external files to flink classpath

2019-07-17 Thread Vishwas Siravara
Does the -yt option work for standalone cluster without dedicated resource
manager ? So this property file is read by one of the dependencies inside
my application as a file, so I can't really use Parameter tool to parse the
config file.

Thanks,
Vishwas

On Fri, Jun 28, 2019 at 11:08 PM Yun Tang  wrote:

> Hi Vishwas
>
>
>1. You could use '-yt' to ship specified files to the class path,
>please refer to [1] for more details.
>2. If the properties are only loaded on client side before executing
>the application, you could let your application to just read from local
>property data. Flink support to load properties within the
>ParameterTool [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html#usage
> [2]
> https://github.com/apache/flink/blob/f1721293b0701d584d42bd68671181e332d2ad04/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java#L120
>
> Best
> Yun Tang
>
> --
> *From:* Vishwas Siravara 
> *Sent:* Saturday, June 29, 2019 0:43
> *To:* user
> *Subject:* Providing external files to flink classpath
>
> Hi ,
> I am trying to add external property files to the flink classpath for
> my application. These files are not a part of the fat jar. I put them
> under the lib folder but flink cant find them? How can I manage
> external property files that needs to be read by flink ?
>
> Thanks,
> Vishwas
>


Questions about user doc.

2019-07-16 Thread Vishwas Siravara
Hey guys,
In this document :
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
,
there is a line in the beginning of the scheduling section which says that
: "A pipeline consists of multiple successive tasks, such as the
*n-th* parallel
instance of a MapFunction together with the *n-th* parallel instance of a
ReduceFunction. Note that Flink often executes successive tasks
concurrently:"

I am guessing this means that Flink executes successive tasks from
different pipelines successively right ?

I also don't fully understand Intermediate result partition and
Intermediate dataset , why are there two boxes in the diagram for
intermediate result after the first execution job vertex ? Is there any
more docs I can read to clearly understand these diagrams, thanks for your
help.

Thanks,
Vishwas


Unable to start task manager in debug mode

2019-07-08 Thread Vishwas Siravara
Hi guys,
I am not able to start a stand alone session with one task manager and one
job manager on the same node by adding debug option in flink-conf.yaml
as env.java.opts:
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005(
https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters)
.
This is what my master and slave files look like
*cat masters *
*localhost:8081*
*[was@sl73rspapd031 conf]$ cat slaves*
*localhost*

The job manager comes up but the task manager does not, from the log file

ERROR: transport error 202: bind failed: Address already in use
ERROR: JDWP Transport dt_socket failed to initialize, TRANSPORT_INIT(510)
JDWP exit error AGENT_ERROR_TRANSPORT_INIT(197): No transports
initialized [debugInit.c:750]


This is because the job manager binds 5005 to its process and the task
manager cannot do this since it is already associated with the

job manager, how can I start the task manager ? Should I comment out
the debug config from the yaml file after job manager is up and start

the task manger separately ? Thanks for your help.


Providing external files to flink classpath

2019-06-28 Thread Vishwas Siravara
Hi ,
I am trying to add external property files to the flink classpath for
my application. These files are not a part of the fat jar. I put them
under the lib folder but flink cant find them? How can I manage
external property files that needs to be read by flink ?

Thanks,
Vishwas


Error checkpointing to S3 like FS (EMC ECS)

2019-06-24 Thread Vishwas Siravara
Hi,
I am using flink version 1.7.2 , I am trying to use S3 like object
storage EMC ECS(
https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm). Not
all S3 apis are supported by EMC ESC according to this document.  Here
is my config

s3.endpoint: SU73ECSG1P1d.***.COM
s3.access-key: vdna_np_user
security.ssl.rest.enabled: false
web.timeout: 1
s3.secret-key: J***

I can access this bucket from s3cmd client.

I set the state backend from my scala application
env.setStateBackend(new FsStateBackend("s3://aip-featuretoolkit/checkpoints/"))

However when I run my application I get this exception :

ClientException: Unable to execute HTTP request:
aip-featuretoolkit.SU73ECSG1P1d.VISA.COM: Unable to execute HTTP
request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM
   1336 at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:255)
   1337 at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
   1338 at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
   1339 at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
   1340 at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1173)
   1341 at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1153)
   1342 at
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
   1343 at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
   1344 ... 10 more
   1345 Caused by:
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSClientIOException:
doesBucketExist on aip-featuretoolkit:
org.apache.flink.fs.s3base.shaded.co
m.amazonaws.SdkClientException: Unable to execute HTTP request:
aip-featuretoolkit.SU73ECSG1P1d.VISA.COM: Unable to execute HTTP
request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM
   1346 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
   1347 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
   1348 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
   1349 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
   1350 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
   1351 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
   1352 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:372)
   1353 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:308)
   1354 at
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
   1355 at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
   1356 at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
   1357 at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
   1358 at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
   1359 at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
   1360 at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)
   1361 ... 17 more
   1362 Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Unable to execute HTTP request:
aip-featuretoolkit.SU73ECSG1P1d.VISA.COM
   1363 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
   1364 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
   1365 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
   1366 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
   1367 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
   1368 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
   1369 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
   1370 at

Re: Unable to set S3 like object storage for state backend.

2019-06-24 Thread Vishwas Siravara
Hi Ken,
Thanks for reaching out, I created a compliant bucket with name
aip-featuretoolkit. I now get the exception "Unable to execute HTTP
request: aip-featuretoolkit.SU73ECSG1P1d.***.COM: Name or service not
known" from 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.class
 line 56. Here is my config from flink-conf.yaml file.

s3.endpoint: SU73ECSG1P1d.***.COM
s3.access-key: vdna_np_user
security.ssl.rest.enabled: false
web.timeout: 1
s3.secret-key: J***

I have not supplied the port in the config file. Does it internally
use 9021 ? Also I am running my application as a different user not
what is specified in s3.access-key. Does that matter ?

Thanks,
Vishwas

On Thu, Jun 20, 2019 at 5:06 PM Ken Krugler  wrote:
>
> Hi Vishwas,
>
> It might be that you’ve got a legacy bucket name (“aip_featuretoolkit”), as 
> AWS no longer allows bucket names to contain an underscore.
>
> I’m guessing that the Hadoop S3 code is trying to treat your path as a valid 
> URI, but the bucket name doesn’t conform, and thus you get the "null uri 
> host” issue.
>
> Could you try with a compliant bucket name?
>
> — Ken
>
> On Jun 20, 2019, at 2:46 PM, Vishwas Siravara  wrote:
>
> Hi,
> I am using flink version 1.7.2 , I am trying to use S3 like object
> storage EMC ECS(
> https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) .
>
> I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for
> s3 filesystem and I have placed it under the lib folder and is
> available to flink in its class path.
>
> My flink-conf.yaml looks like this :
>
> s3.endpoint: SU73ECSG1P1d.***.COM:9021
> s3.access-key: vdna_np_user
> security.ssl.rest.enabled: false
> web.timeout: 1
> s3.secret-key: J***
>
> And my code for statebackend is like this :
>
> env.setStateBackend(new 
> FsStateBackend("s3://aip_featuretoolkit/checkpoints/"))
>
> I have a bucket called aip_featuretoolkit in my s3 instance. I can
> connect to s3 form s3 command line utilities. However I cannot
> checkpoint with this configuration in flink. I get the following error
> message
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Could not
> retrieve JobResult.
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> at com.visa.flink.job.DruidStreamJob$.runJob(DruidStreamJob.scala:62)
> at com.visa.flink.cli.CliFlinkDruid.run(CliFlinkDruid.scala:19)
> at com.visa.flink.cli.Main$.main(Main.scala:22)
> at com.visa.flink.cli.Main.main(Main.scala)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> Failed to submit job.
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
> at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJ

Unable to set S3 like object storage for state backend.

2019-06-20 Thread Vishwas Siravara
Hi,
I am using flink version 1.7.2 , I am trying to use S3 like object
storage EMC ECS(
https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) .

I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for
s3 filesystem and I have placed it under the lib folder and is
available to flink in its class path.

My flink-conf.yaml looks like this :

s3.endpoint: SU73ECSG1P1d.***.COM:9021
s3.access-key: vdna_np_user
security.ssl.rest.enabled: false
web.timeout: 1
s3.secret-key: J***

And my code for statebackend is like this :

env.setStateBackend(new FsStateBackend("s3://aip_featuretoolkit/checkpoints/"))

I have a bucket called aip_featuretoolkit in my s3 instance. I can
connect to s3 form s3 command line utilities. However I cannot
checkpoint with this configuration in flink. I get the following error
message

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Could not
retrieve JobResult.
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.visa.flink.job.DruidStreamJob$.runJob(DruidStreamJob.scala:62)
at com.visa.flink.cli.CliFlinkDruid.run(CliFlinkDruid.scala:19)
at com.visa.flink.cli.Main$.main(Main.scala:22)
at com.visa.flink.cli.Main.main(Main.scala)
Caused by: org.apache.flink.runtime.client.JobSubmissionException:
Failed to submit job.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set
up JobManager
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException:
Could not set up JobManager
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.RuntimeException: Failed to start checkpoint ID
counter: null uri host. This can be caused by unencoded / in the
password string
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:255)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
at