Re: Impossible to get pending file names/paths on checkpoint?

2021-10-19 Thread Preston Price
So Fabian, I wanted to follow up on something, perhaps you can weigh in. I
had previously made the claim that getting things working with ADLS would
be trivial, but that has turned out not to be the case in Flink 1.14. I
have a sink that works in Flink 1.10 based off the old BucketingSink
<https://github.com/apache/flink/blob/release-1.10.0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java>
that writes to ADLS with the ABFS connector, and I had made the assumption
that this would work for the FileSink as well in 1.14. But it turns out
that ADLS is not supported by the FileSink, see this issue:
https://issues.apache.org/jira/browse/FLINK-17444

tl/dr: It seems that the FileSink expects the underlying FS to support a
RecoverableWriter, and the HadoopRecoverableWriter is explicitly excluded
from the Azure plugin resulting in a NoClassDefFoundError.

Now, the old BucketingSink has been removed, replaced by the
SteamingFileSink, and then the FileSink. However it appears that Azure
Storage is not supported by either of these file sinks. Is my understanding
correct? This is incredibly disappointing if so, I hope that I am
misinterpreting something.

On Tue, Oct 12, 2021 at 4:47 PM Preston Price  wrote:

> Thanks for your thoughts here Fabian, I've responded inline but I also
> want to clarify the reason I need the file paths on commit.
> The FileSink works as expected in Azure Data Lake with the ABFS connector,
> but I want to perform an additional step by telling Azure Data Explorer to
> ingest the committed files, and I need their paths to do so. This is why
> I've implemented the hack below to Reflectively get access to the
> underlying File, which I can then use to craft my ingestion command to
> Azure Data Explorer.
>
> On Tue, Oct 12, 2021 at 2:15 AM Fabian Paul 
> wrote:
>
>> Hi Preston,
>>
>> I just noticed I forgot to cc to the user mailing list on my first reply
>> …. I have a few thoughts about the design you are describing.
>>
>>
>> In the meantime I have a nasty hack in place that has unblocked me for
>> now in getting the target file off the LocalRecoverable/HadoopFsRecoverable:
>>
>> InProgressFileWriter.PendingFileRecoverable recoverable =
>>> committable.getPendingFile();
>>
>> RecoverableWriter.CommitRecoverable commitRecoverable =
>>> ((OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable)
>>> recoverable).getCommitRecoverable();
>>
>>
>>> Method m = commitRecoverable.getClass().getMethod("targetFile");
>>> m.setAccessible(true);
>>> File targetFile = (File) m.invoke(commitRecoverable);
>>
>> I think a good place to start would be to introduce getTargetFile, and
>> getTempFile methods on the CommitRecoverable interface, though I haven't
>> fully studied the impact of this approach on other implementations of that
>> interface.
>>
>>
>> I might miss the context here or lack of knowledge how the Azure Data
>> Lake works but why do you need access to the target and/or temp file
>> locations. You scenario sounds very similar to any other distributed file
>> system.
>>
>
> For my case I need to know the final path to the finished files so I can
> issue an ingest command to Azure Data Explorer for each file once they're
> committed. When using Azure Data Lake for storage I can instruct Azure Data
> Explorer to ingest a file from a path in blob storage, but I need to know
> what the path is. Alternatively we may be able to leverage something like
> Event Grid which can send a signal whenever a new file lands in a
> particular path in Azure Data Lake, but there are benefits to having tight
> control over issuing the ingest commands.
>
>
>>
>>
>> A note on implementing our part-file scoped Encoder: The current Encoder
>> pattern in 1.14 assumes that the same encoder will work for all files, for
>> all time. We had to make numerous small changes to the File Sink to break
>> this pattern, and allow for an Encoder instance per part file. My current
>> solution uses a custom BucketID object with both Path, and EventType
>> properties. In our BucketWriter.openNew method we can use the
>> BucketId.EventType to lookup the Protobuf descriptor we need, create a new
>> Encoder and pass it to our RowWisePartWriter. We had to reimplement/absorb
>> a significant amount of the File Sink code to accomplish this as the File
>> Sink implementation assumes a String for BucketID and there are many
>> roadblocks put in place to prevent extending FileSink functionality.
>>
>>
>> This is an interesting point. I guess we did not think abo

Unable to create connection to Azure Data Lake Gen2 with abfs: "Configuration property {storage_account}.dfs.core.windows.net not found"

2021-10-19 Thread Preston Price
Some details about my runtime/environment:
Java 11
Flink version 1.14.0
Running locally in IntelliJ

The error message that I am getting is: Configuration property
{storage_account}.dfs.core.windows.net not found.
Reading through all the docs hasn't yielded much help.

In the Flink docs here
,
it's claimed that we can set the credentials for ABFS by specifying the
value in flink-conf.yaml, so this is what I am trying. However, in the code
path expressed in the stack trace, I don't believe the configuration loaded
from flink-conf.yaml is ever consulted. Here are the relevant parts of the
stack trace:
Caused by:
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException:
Configuration property {storage_account}.dfs.core.windows.net not found.
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:372)
~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1133)
~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.(AzureBlobFileSystemStore.java:174)
~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110)
~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
at
org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79)
~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
~[flink-core-1.14.0.jar:1.14.0]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
~[flink-core-1.14.0.jar:1.14.0]

When we first call org.apache.flink.core.fs.FileSystem.get the
FileSystemFactories are initialized with a new/empty configuration
,
not the configuration loaded from flink-conf.yaml Therefore, later on when
we're calling AbstractAzureFSFactory.create

we
have an empty config, so the call
to HadoopConfigLoader.getOrLoadHadoopConfig() then
HadoopConfigLoader.loadHadoopConfigFromFlink

can't
merge in our config from flink-conf.yaml

So if the Configuration loaded from flink-conf.yaml isn't supplied to the
AbstractAzureFSFactory, how do we configure Flink to connect to Azure Data
Lake?

Thanks!


Re: Disable KafkaSourceReaderMetrics logs

2021-10-15 Thread Preston Price
There is an open bug for this here:
https://issues.apache.org/jira/browse/FLINK-24497
For log4j2 these settings worked for me:

# mute obnoxious warnings due to this bug:
https://issues.apache.org/jira/browse/FLINK-24497
logger.flink_annoying_mute.name =
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics
logger.flink_annoying_mute.level = error


On Fri, Oct 15, 2021 at 8:08 AM Denis Nutiu  wrote:

> Hi,
>
> My Flink (1.14.0) job seems to output a lot of error messages with the
> following text:
>
> 16:46:38,562 WARN
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics []
> - Error when getting Kafka consumer metric "records-lag" for partition
> "lambada.events-0". Metric "pendingBytes" may not be reported correctly.
> java.lang.IllegalStateException: Cannot find Kafka metric matching current
> filter.
> at
> org.apache.flink.connector.kafka.MetricUtil.lambda$getKafkaMetric$1(MetricUtil.java:63)
> ~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at java.util.Optional.orElseThrow(Optional.java:408) ~[?:?]
> at
> org.apache.flink.connector.kafka.MetricUtil.getKafkaMetric(MetricUtil.java:61)
> ~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.getRecordsLagMetric(KafkaSourceReaderMetrics.java:304)
> ~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.lambda$maybeAddRecordsLagMetric$4(KafkaSourceReaderMetrics.java:229)
> ~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
> [?:?]
> at
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.maybeAddRecordsLagMetric(KafkaSourceReaderMetrics.java:228)
> [flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:187)
> [flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> [flink-connector-base-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> [flink-connector-base-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> [flink-connector-base-1.14.0.jar:1.14.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> [?:?]
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> I tried to disable the logs by adding the following line log4j2.properties
> but it did not work.
>
>
> log4j.logger.org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics=OFF
>
> Is there any other way to disable the messages?
> --
> Best,
> Denis
>


Re: How to refresh topics to ingest with KafkaSource?

2021-10-14 Thread Preston Price
Okay so topic discovery is possible with topic patterns, and maybe topic
lists. However I don't believe it's possible to change the configured topic
list, or topic pattern after the source is created.

On Thu, Oct 14, 2021, 3:52 PM Denis Nutiu  wrote:

> There is a setting for dynamic topic discovery
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#topic-and-partition-discovery
>
> Best,
>
> Denis
>
> On Fri, Oct 15, 2021 at 12:48 AM Denis Nutiu 
> wrote:
>
>> Hi,
>>
>> In my experience with the librdkafka client and the Go wrapper, the
>> topic-pattern subscribe is reactive. The Flink Kafka connector might behave
>> similarly.
>>
>> Best,
>> Denis
>>
>> On Fri, Oct 15, 2021 at 12:34 AM Preston Price 
>> wrote:
>>
>>> No, the topic-pattern won't work for my case. Topics that I should
>>> subscribe to can be enabled/disabled based on settings I read from another
>>> system, so there's no way to craft a single regular expression that would
>>> fit the state of all potential topics. Additionally the documentation you
>>> linked seems to suggest that the regular expression is evaluated only once
>>> "when the job starts running". My understanding is it would not pick up new
>>> topics that match the pattern after the job starts.
>>>
>>>
>>> On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng 
>>> wrote:
>>>
>>>> Hi!
>>>>
>>>> I suppose you want to read from different topics every now and then?
>>>> Does the topic-pattern option [1] in Table API Kafka connector meet your
>>>> needs?
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern
>>>>
>>>> Preston Price  于2021年10月14日周四 上午1:34写道:
>>>>
>>>>> The KafkaSource, and KafkaSourceBuilder appear to prevent users from
>>>>> providing their own KafkaSubscriber. Am I overlooking something?
>>>>>
>>>>> In my case I have an external system that controls which topics we
>>>>> should be ingesting, and it can change over time. I need to add, and 
>>>>> remove
>>>>> topics as we refresh configuration from this external system without 
>>>>> having
>>>>> to stop and start our Flink job. Initially it appeared I could accomplish
>>>>> this by providing my own implementation of the `KafkaSubscriber` 
>>>>> interface,
>>>>> which would be invoked periodically as configured by the `
>>>>> partition.discovery.interval.ms` property. However there is no way to
>>>>> provide my implementation to the KafkaSource since the constructor for
>>>>> KafkaSource is package protected, and the KafkaSourceBuilder does not
>>>>> supply a way to provide the `KafkaSubscriber`.
>>>>>
>>>>> How can I accomplish a period refresh of the topics to ingest?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>
>> --
>> Regards,
>> Denis Nutiu
>>
>
>
> --
> Regards,
> Denis Nutiu
>


Re: How to refresh topics to ingest with KafkaSource?

2021-10-14 Thread Preston Price
No, the topic-pattern won't work for my case. Topics that I should
subscribe to can be enabled/disabled based on settings I read from another
system, so there's no way to craft a single regular expression that would
fit the state of all potential topics. Additionally the documentation you
linked seems to suggest that the regular expression is evaluated only once
"when the job starts running". My understanding is it would not pick up new
topics that match the pattern after the job starts.


On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng  wrote:

> Hi!
>
> I suppose you want to read from different topics every now and then? Does
> the topic-pattern option [1] in Table API Kafka connector meet your needs?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern
>
> Preston Price  于2021年10月14日周四 上午1:34写道:
>
>> The KafkaSource, and KafkaSourceBuilder appear to prevent users from
>> providing their own KafkaSubscriber. Am I overlooking something?
>>
>> In my case I have an external system that controls which topics we should
>> be ingesting, and it can change over time. I need to add, and remove topics
>> as we refresh configuration from this external system without having to
>> stop and start our Flink job. Initially it appeared I could accomplish this
>> by providing my own implementation of the `KafkaSubscriber` interface,
>> which would be invoked periodically as configured by the `
>> partition.discovery.interval.ms` property. However there is no way to
>> provide my implementation to the KafkaSource since the constructor for
>> KafkaSource is package protected, and the KafkaSourceBuilder does not
>> supply a way to provide the `KafkaSubscriber`.
>>
>> How can I accomplish a period refresh of the topics to ingest?
>>
>> Thanks
>>
>>
>>


How to refresh topics to ingest with KafkaSource?

2021-10-13 Thread Preston Price
The KafkaSource, and KafkaSourceBuilder appear to prevent users from
providing their own KafkaSubscriber. Am I overlooking something?

In my case I have an external system that controls which topics we should
be ingesting, and it can change over time. I need to add, and remove topics
as we refresh configuration from this external system without having to
stop and start our Flink job. Initially it appeared I could accomplish this
by providing my own implementation of the `KafkaSubscriber` interface,
which would be invoked periodically as configured by the `
partition.discovery.interval.ms` property. However there is no way to
provide my implementation to the KafkaSource since the constructor for
KafkaSource is package protected, and the KafkaSourceBuilder does not
supply a way to provide the `KafkaSubscriber`.

How can I accomplish a period refresh of the topics to ingest?

Thanks


Re: Impossible to get pending file names/paths on checkpoint?

2021-10-12 Thread Preston Price
Thanks for your thoughts here Fabian, I've responded inline but I also want
to clarify the reason I need the file paths on commit.
The FileSink works as expected in Azure Data Lake with the ABFS connector,
but I want to perform an additional step by telling Azure Data Explorer to
ingest the committed files, and I need their paths to do so. This is why
I've implemented the hack below to Reflectively get access to the
underlying File, which I can then use to craft my ingestion command to
Azure Data Explorer.

On Tue, Oct 12, 2021 at 2:15 AM Fabian Paul 
wrote:

> Hi Preston,
>
> I just noticed I forgot to cc to the user mailing list on my first reply
> …. I have a few thoughts about the design you are describing.
>
>
> In the meantime I have a nasty hack in place that has unblocked me for now
> in getting the target file off the LocalRecoverable/HadoopFsRecoverable:
>
> InProgressFileWriter.PendingFileRecoverable recoverable =
>> committable.getPendingFile();
>
> RecoverableWriter.CommitRecoverable commitRecoverable =
>> ((OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable)
>> recoverable).getCommitRecoverable();
>
>
>> Method m = commitRecoverable.getClass().getMethod("targetFile");
>> m.setAccessible(true);
>> File targetFile = (File) m.invoke(commitRecoverable);
>
> I think a good place to start would be to introduce getTargetFile, and
> getTempFile methods on the CommitRecoverable interface, though I haven't
> fully studied the impact of this approach on other implementations of that
> interface.
>
>
> I might miss the context here or lack of knowledge how the Azure Data Lake
> works but why do you need access to the target and/or temp file locations.
> You scenario sounds very similar to any other distributed file system.
>

For my case I need to know the final path to the finished files so I can
issue an ingest command to Azure Data Explorer for each file once they're
committed. When using Azure Data Lake for storage I can instruct Azure Data
Explorer to ingest a file from a path in blob storage, but I need to know
what the path is. Alternatively we may be able to leverage something like
Event Grid which can send a signal whenever a new file lands in a
particular path in Azure Data Lake, but there are benefits to having tight
control over issuing the ingest commands.


>
>
> A note on implementing our part-file scoped Encoder: The current Encoder
> pattern in 1.14 assumes that the same encoder will work for all files, for
> all time. We had to make numerous small changes to the File Sink to break
> this pattern, and allow for an Encoder instance per part file. My current
> solution uses a custom BucketID object with both Path, and EventType
> properties. In our BucketWriter.openNew method we can use the
> BucketId.EventType to lookup the Protobuf descriptor we need, create a new
> Encoder and pass it to our RowWisePartWriter. We had to reimplement/absorb
> a significant amount of the File Sink code to accomplish this as the File
> Sink implementation assumes a String for BucketID and there are many
> roadblocks put in place to prevent extending FileSink functionality.
>
>
> This is an interesting point. I guess we did not think about such use case
> when developing the sink. Maybe we can approach the problem differently.
> I am thinking about adding a context to the `Encoder#encode` method where
> metadata (new bucket, filename, bucketname) is accessible. Does this help
> in your case?
>

Yes, this could have saved me a great deal of hassle if there were
additional context provided to the encoder about the lifecycle, and
BucketID of the underlying part file. It would still be a bit of a complex
Encoder as, for my case, each bucket needs to be encoded differently, and
state updated when files roll.


> A perfect example of these roadblocks is the FileWriterBucketFactory
> interface. It looks like we could provide our own implementation of this
> interface, but the return type of it's methods (FileWriterBucket) have
> default (package protected) visibility and so we can neither provide our
> own implementation, nor sub-class the return types to add our own logic.
> Another example is the OutputStreamBasedPartFileWriter which wraps a
> default (package protected) visibility abstract class
> (OutputStreamBasedBucketWriter). I ran into numerous issues like these.
>
>
> In general, all classes annotated with @Internal are not meant to be used
> outside of  Flink but I agree sometimes it becomes necessary. Although if
> more and more people need to reimplement big parts of the FlieSink we have
> to incorporate that feedback make it extensible.
>

In my case the solution is then to reimplement all that great functionality
and it will make upgrading to future versions of Flink harder.


>
> A note on implementing our Azure Data Explorer sink: Currently we're
> looking to add code in a custom Committer to do this. However, since I
> can't grok a way to make the file commit + ADX ingest 

Impossible to get pending file names/paths on checkpoint?

2021-10-08 Thread Preston Price
I am trying to implement a File Sink that persists files to Azure Data
Lake, and then on commit I want to ingest these files to Azure Data
Explorer. Persisting the files is pretty trivial using the ABFS connector.

However, it does not appear to be possible to get any details about
names/paths to the pending files when they're committed. There are very few
details exposed in FileSinkCommittable, so I am currently blocked. The
paths to the files are needed when issuing ingest commands to the Azure
Data Explorer API. I have considered using automated ingestion for Azure
Data Explorer with EventHub but I need more control over the ingestion
commands for my use case.

I'm finding it very difficult to extend the functionality of the FileSink
as many public classes and interfaces have private constructors, or package
protected return types so I have to re-implement a significant amount of
these features to make minor changes.

Perhaps I'm pursuing this solution in the wrong way?
Thanks for any clues or guidance.