Re: PyFlink on EMR on EKS

2024-09-03 Thread Ahmed Hamdy
Hi Alexandre,
This seems to be complaining about the python script loading. It seems that
the local file system is using `file` file prefix not `local`[1].
FYI inside your python script you can add more dependencies like connectors
and so using python dependency management[2] which differs from Java's
maven/gradle dependency management systems.

1-
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/filesystems/overview/#local-file-system
2-
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/dependency_management
Best Regards
Ahmed Hamdy


On Mon, 2 Sept 2024 at 15:42, Alexandre KY 
wrote:

> Hello,
>
> I am trying to deploy my application on Amazon EMR on EKS. My application
> is in Python and from what I have read, I must create my own custom image.
> I tried to run a simple test with a very simple Flink job. Here is the
> Flink application:
>
> ```python
> # main.py
>
> import logging
> import sys
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink_proto.docker.aws.test.Person import Person
>
>
> def main():
> env = StreamExecutionEnvironment.get_execution_environment()
> test = [
> Person("Fred", 35),
> Person("Wilma", 35),
> Person("Pebbles", 2)
> ]
>
> flintstones = env.from_collection(test)
>
> adults = flintstones.filter(
> lambda p: p.age >= 18
> )
>
> adults.print()
>
> # submit for execution
> env.execute()
>
>
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO,
> format="%(message)s")
>
> main()
> ```
>
> ```python
> # Person.py
>
> class Person:
>
> def __init__(self, name, age):
> self.name = name
> self.age = age
>
> def __str__(self):
> return '{}: age {}'.format(self.name, str(self.age))
> ```
>
> Here is the Dockerfile used:
>
> ```dockerfile
> FROM public.ecr.aws/emr-on-eks/flink/emr-7.0.0-flink:latest
> USER root
> RUN pip3 install numpy apache-flink==1.18.1
> USER hadoop:hadoop
> ```
>
> When I try to execute the Flink job in application mode using native
> Kubernetes:
>
> ```shell
> $FLINK_HOME/bin/flink run-application \
>   --target kubernetes-application \
>   -Dkubernetes.namespace=$NAMESPACE \
>   -Dkubernetes.cluster-id=$EMR_CLUSTER \
>   -Dkubernetes.container.image.ref=$IMAGE \
>   -Dkubernetes.service-account=$FLINK_SERVICE_ACCOUNT \
>   -Djobmanager.heap.size=1024m \
>   -Dtaskmanager.memory.process.size=2048m \
>   -py local:///opt/flink_job/test.py
> ```
>
> I get the following error:
>
> ```shell
> ERROR org.apache.flink.client.python.PythonDriver  [] -
> Run python process failed
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'local'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded. For a full list of supported file systems, please
> see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:543)
> ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
> ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
> ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]
> at
> org.apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:291)
> ~[flink-python-1.18.0-amzn-0.jar:1.18.0-amzn-0]
> at
> org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:226)
> ~[flink-python-1.18.0-amzn-0.jar:1.18.0-amzn-0]
> at
> org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:487)
> ~[flink-python-1.18.0-amzn-0.jar:1.18.0-amzn-0]
> at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:92)
> ~[flink-python-1.18.0-amzn-0.jar:1.18.0-amzn-0]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[?:?]
> at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:?]
> at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dis

Re: flink kafka sink batch mode delivery guaranties limitations

2024-08-23 Thread Ahmed Hamdy
Hi Nicholas,
Could you elaborate what you think is missing?

I can see there is a warning that EXACTLY_ONCE sink wouldn't operate.

> It is important to remember that because there are no checkpoints,
certain features such as CheckpointListener
<https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java//org/apache/flink/api/common/state/CheckpointListener.html>and,
as a result, Kafka’s EXACTLY_ONCE
<https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance>
mode
or File Sink’s OnCheckpointRollingPolicy
<https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/filesystem/#rolling-policy>
won’t
work.
Best Regards
Ahmed Hamdy


On Fri, 23 Aug 2024 at 11:48, Nicolas Paris 
wrote:

> hi
>
> From my tests kafka sink in exactly-once and batch runtime will never
> commit the transaction, leading to not honour the semantic. This is
> likely by design since records are ack/commited during a checkpoint,
> which never happens in batch mode. I am missing something or the
> documentation should warn the users ?
>
> Resources:
>
> from
> https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#boundedness
> > Kafka source is designed to support both streaming and batch running mode
>
> from
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance
> > AT_LEAST_ONCE: The sink will wait for all outstanding records in the
> Kafka buffers to be acknowledged by the Kafka producer on a checkpoint
> > EXACTLY_ONCE: In this mode, the KafkaSink will write all messages in a
> Kafka transaction that will be committed to Kafka on a checkpoint.
>
> from
>
> https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/execution_mode/#important-considerations
> > Unsupported in BATCH mode: Checkpointing and any operations that depend
> on checkpointing do not work.
>
>
>


Re: Getting Direct buffer memory. Errors with Kafka.

2024-08-23 Thread Ahmed Hamdy
Why do you believe it is an SSL issue?
The error trace seems like a memory issue. you could refer to
taskmanager memory setup guide[1].

1-
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/

Best Regards
Ahmed Hamdy


On Fri, 23 Aug 2024 at 13:47, John Smith  wrote:

> I'm pretty sure it's not SSL is there a way to confirm, since the take
> does work. And/or is there other settings I can try?
>
> On Thu, Aug 22, 2024, 11:06 AM John Smith  wrote:
>
>> Hi getting this exception, a lot of resources online point to an SSL
>> misconfiguration.
>>
>> We are NOT using SSL. Neither on the broker or the consumer side. Our
>> jobs work absolutely fine as in the flink task is able to consume from
>> kafka parse the json and then push it to the JDBC database sink.
>>
>> I would assume if SSL was enabled on one side or the other that the
>> records would be completely mangled and unparsable from not being able to
>> encrypt/decrypt. Also this seems to happen about once a week.
>>
>> 2024-08-22 10:17:09
>> java.lang.RuntimeException: One or more fetchers have encountered
>> exception
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>> at
>> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>> at
>> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>> at
>> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>> at java.base/java.lang.Thread.run(Thread.java:829)
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>> out-of-memory error has occurred. This can mean two things: either job(s)
>> require(s) a larger size of JVM direct memory or there is a direct memory
>> leak. The direct memory can be allocated by user code or some of its
>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> configuration option should be increased. Flink framework and its
>> dependencies also consume the direct memory, mostly for network
>> communication. The most of network memory is managed by Flink and should
>> not result in out-of-memory error. In certain special cases, in particular
>> for jobs with high parallelism, the framework may require more direct
>> memory which is not managed by Flink. In this case
>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>> increased. If the error persists then there is probably a direct memory
>> leak in user code or some of its dependencies which has to be investigated
>> and fixed. The task executor has to be shutdown...
>> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
>> at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
>> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
>> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
>> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>> at
>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>> at
>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>> at
>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>> at org.apache.kafka.common.network.Selector.attemptR

Re: S3 schema for jar location?

2024-08-02 Thread Ahmed Hamdy
Hi Maxim,
You need to add the s3 filesystem in the Flink plugins directory in the
operator to be able to work with S3, this is similar to any other
Filesystem and similar to how Flink itself works.
Flink offers 2 S3 filesystem implementations
- flink-s3-fs-hadoop[1] for extension s3a://***
- flink-s3-fs-presto[2] for extensions s3://***

One easy way of doing so is to repackage the image yourself as in

```

FROM --platform=linux/amd64 apache/flink-kubernetes-operator:1.9.0
ENV FLINK_VERSION=1.17.2

## Maybe curl GET the filesystem dependency first

ADD flink-s3-fs-hadoop-$FLINK_VERSION.jar
/opt/flink/plugins/flink-s3-fs-hadoop/
```


1- https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop
2- https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-presto
Best Regards
Ahmed Hamdy


On Fri, 2 Aug 2024 at 00:05, Maxim Senin via user 
wrote:

> When will Flink Operator support schemas other than `local` for
> application deployment jar files? I just tried flink operator 1.9 and it’s
> still not working with `s3` locations. If s3 is good for savepoints and
> checkpoints, why can’t the jar also be on s3?
>
> Thanks,
> Maxim
>
> --
>
> COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this
> email is confidential and is intended solely for the addressee. Access to
> this email by anyone else is unauthorized. If you are not the intended
> recipient, any disclosure, copying, distribution or any action taken or
> omitted to be taken in reliance on it, is prohibited and may be unlawful.
>


Re: Elasticsearch 8.x Connector in Maven

2024-07-31 Thread Ahmed Hamdy
Hi Rion
It seems that ES 8 was supported ahead of 3.1 release[1], which seems to
not be released yet hence not published to maven.
Given the importance of ES 8 and the fact that elastic search still depends
on Flink 1.18 while we are releasing 1.20, I would suggest nudging the dev
list[2] for a release manager to pick up the 3.1 release.


1-https://issues.apache.org/jira/browse/FLINK-26088
2-https://lists.apache.org/list.html?d...@flink.apache.org

Best Regards
Ahmed Hamdy


On Wed, 31 Jul 2024 at 15:06, Rion Williams  wrote:

> Hi again all,
>
> Just following up on this as I’ve scoured around trying to find any
> documentation for using the ES 8.x connector, however everything only
> appears to reference 6/7.
>
> The ES 8.x seems to have been released for quite a bit of time, so I’m
> curious how others are using it. I’d really like to avoid doing something
> like forking the bits I need in my local repository if possible and
> building it on my own.
>
> Thanks in advance,
>
> Rion
>
> > On Jul 30, 2024, at 1:00 PM, Rion Williams 
> wrote:
> >
> > Hi all,
> >
> > I see that the Elasticsearch Connector for 8.x is supported per the repo
> (and completed JIRAs). Is there a way to reference this via Maven? Or is it
> required to build the connector from the source directly?
> >
> > We recently upgraded an Elasticsearch cluster to 8.x and some of the
> writes are now failing from the 7.x RHLC, so trying to upgrade to get to
> parity (and stop the jobs from crashing).
> >
> > Thanks in advance,
> >
> > Rion
>


Re: Event de duplication in flink with rabbitmq connector

2024-07-18 Thread Ahmed Hamdy
Yes, The current implementation doesn't leverage transactions on publish
like it does for the source on acking and nacking the deliveries, you can
raise a ticket to support exactly once RMQSinks within the community or
implement the logic yourself.

my checkpoints size is increasing.  can this lead to state build up. As
> checkpoints might need to keep the state so windows can't purge them??


No, In theory after the window is materialized that instance state is
purged, on checkpointing active window states are recorded, Checkpoints
here record a snapshot of the pipeline rather than the whole progress of an
operator however some other operators do have to accumulate state and flush
on checkpoints (similar to the RMQSource in this case).
Debugging why you have an inflating state might need a deep dive on your
data flow and job performance for bottlenecks and also experiment with
different configurations for rocksdb compaction.



Best Regards
Ahmed Hamdy


On Thu, 18 Jul 2024 at 13:52, banu priya  wrote:

> Hi Ahmed,
>
> Thanks for the clarification. I see from flink documentation that Kafka
> sinks are transactional and de duplication happens for it..but it is not
> applicable for RMQ sink.
>
> But i have to use RMQ Sink only due to project requirements .
>
> I am facing one more issue i.e. my check points size is increasing. What I
> understand is after tumbling window state is cleared. I had tumbling window
> (that uses processing time and triggers every 2s) and check point interval
> of 10s, can this lead to state build up. As checkpoints might need to keep
> the state so windows can't purge them??
>
>
> Thanks
> Banu
>
>
>
>
> On Thu, 18 Jul, 2024, 5:55 pm Ahmed Hamdy,  wrote:
>
>> Hi Banu,
>> yes, regarding the RMQSource, it only acknowledges during checkpoint
>> completion, all the messages after the checkpoint till the next checkpoint
>> completion are grouped to be acknowledged together whether that is during
>> the minimum pause or during the start of the next checkpoint. Failure
>> during this periods will have these unacked messages reprocessed again.
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Thu, 18 Jul 2024 at 13:20, banu priya  wrote:
>>
>>> Hi Ahmed,
>>>
>>> Thanks a lot for your reply.
>>>
>>> I am planning keep both window time and check point interval same ie 10s.
>>>
>>> Minimum pause between check point is 5s. What happens to the events that
>>> are received during this time??
>>>
>>> Will it be acknowledged at the end of next checkpoint?
>>>
>>> Thanks
>>> Banu
>>>
>>>
>>> On Thu, 18 Jul, 2024, 5:34 pm Ahmed Hamdy,  wrote:
>>>
>>>> Hi Banu,
>>>> This behavior of source is expected, the guarantee of the RMQSource is
>>>> exactly once which is achieved by acknowledging envelopes on checkpoints
>>>> hence the source would never re-read a message after checkpoint even if it
>>>> was still inside the pipeline and not yet passed to sink, eager
>>>> acknowledgment causes risk of data loss on failure and restoring from a
>>>> previous checkpoint hence breaking all delivery guarantees.
>>>> In concept there is no guarantee that a Flink pipeline achieves end to
>>>> end exactly once without an exactly once sink as well (which is not the
>>>> case for RMQSink).
>>>> In your case, reprocessing is bound by the checkpoint interval which is
>>>> 5 minutes, you can make it tighter if it suits your case better.
>>>>
>>>> Best Regards
>>>> Ahmed Hamdy
>>>>
>>>>
>>>> On Thu, 18 Jul 2024 at 11:37, banu priya  wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Gentle reminder about bow query.
>>>>>
>>>>> Thanks
>>>>> Banu
>>>>>
>>>>> On Tue, 9 Jul, 2024, 1:42 pm banu priya,  wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have a Flink job with a RMQ source, tumbling windows (fires for
>>>>>> each 2s), an aggregator, then a RMQ sink. Incremental RocksDB 
>>>>>> checkpointing
>>>>>> is enabled with an interval of 5 minutes.
>>>>>>
>>>>>> I was trying to understand Flink failure recovery. My checkpoint X is
>>>>>> started, I have sent one event to my source. As windows are triggered 
>>>>>> every
>>>>>> 2s, my sink is updated with the aggregated result. But when I checked the
>>>>>> RabbitMQ console, my source queue still had unacked messages. (It is
>>>>>> expected and it is as per design
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/rabbitmq/#rabbitmq-source
>>>>>> ).
>>>>>>
>>>>>> Now I restarted my task manager, as restart happens within the same
>>>>>> checkpoint interval and checkpoint X has not yet completed. The message 
>>>>>> is
>>>>>> not acknowledged and is sent again. Duplicate processing of events 
>>>>>> happens.
>>>>>>
>>>>>> How to avoid these duplicates?
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> Banu
>>>>>>
>>>>>


Re: Event de duplication in flink with rabbitmq connector

2024-07-18 Thread Ahmed Hamdy
Hi Banu,
This behavior of source is expected, the guarantee of the RMQSource is
exactly once which is achieved by acknowledging envelopes on checkpoints
hence the source would never re-read a message after checkpoint even if it
was still inside the pipeline and not yet passed to sink, eager
acknowledgment causes risk of data loss on failure and restoring from a
previous checkpoint hence breaking all delivery guarantees.
In concept there is no guarantee that a Flink pipeline achieves end to end
exactly once without an exactly once sink as well (which is not the case
for RMQSink).
In your case, reprocessing is bound by the checkpoint interval which is 5
minutes, you can make it tighter if it suits your case better.

Best Regards
Ahmed Hamdy


On Thu, 18 Jul 2024 at 11:37, banu priya  wrote:

> Hi All,
>
> Gentle reminder about bow query.
>
> Thanks
> Banu
>
> On Tue, 9 Jul, 2024, 1:42 pm banu priya,  wrote:
>
>> Hi All,
>>
>> I have a Flink job with a RMQ source, tumbling windows (fires for each
>> 2s), an aggregator, then a RMQ sink. Incremental RocksDB checkpointing is
>> enabled with an interval of 5 minutes.
>>
>> I was trying to understand Flink failure recovery. My checkpoint X is
>> started, I have sent one event to my source. As windows are triggered every
>> 2s, my sink is updated with the aggregated result. But when I checked the
>> RabbitMQ console, my source queue still had unacked messages. (It is
>> expected and it is as per design
>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/rabbitmq/#rabbitmq-source
>> ).
>>
>> Now I restarted my task manager, as restart happens within the same
>> checkpoint interval and checkpoint X has not yet completed. The message is
>> not acknowledged and is sent again. Duplicate processing of events happens.
>>
>> How to avoid these duplicates?
>>
>>
>> Thanks
>>
>> Banu
>>
>


Re: DynamoDB Table API Connector Failing On Row Deletion - "The provided key element does not match the schema"

2024-05-31 Thread Ahmed Hamdy
Hi Rob,
I agree with the issue here as well as the proposed solution. Thanks alot
for the deep dive and the reproducing steps.
I have created a ticket on your behalf:
https://issues.apache.org/jira/browse/FLINK-35500
you can comment on it if you intend to work on it and then submit a PR
against https://github.com/apache/flink-connector-aws repo. One of the
committers should be able to review it then.

Best Regards
Ahmed Hamdy


On Fri, 31 May 2024 at 00:55, Rob Goretsky 
wrote:

> Hello!
>
> I am looking to use the DynamoDB Table API connector to write rows to AWS
> DynamoDB.  I have found what appears to be a bug in the implementation for
> Row Delete operations.   I have an idea on what needs to be fixed as well,
> and given that I am new to this community, I am looking for guidance on how
> to get an official JIRA created for this issue and then potentially
> contribute a fix via a GitHub PR.
>
> The issue is that, for a Deletion request, DynamoDB expects to get a
> request containing *only *the primary key fields, but the Connector is
> including ALL fields in the Row with the Delete request, leading to the
> runtime error message:
>
> org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
> The provided key element does not match the schema
>
> I believe the root cause here is line 275 in DynamoDbSinkWriter.java as
> seen here -
> https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267
>
>
> This line seems to be setting the "key" for the DeleteRequest to the full
> set of attributes for the Row, rather than only the key field(s).
>
> To replicate this issue, one can take the following steps:
>
> (1) Create a new DynamoDB table in AWS.  Command line:
> aws dynamodb create-table \
>   --table-name orders \
>   --attribute-definitions AttributeName=userId,AttributeType=S \
>   --key-schema AttributeName=userId,KeyType=HASH \
>   --billing-mode PAY_PER_REQUEST
>
> (2) Create an input file in Debezium-JSON format with the following rows
> to start:
> {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
> {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
> {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
> {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}
>
> (3) Start the Flink SQL Client, and run the following, substituting in the
> proper local paths for the Dynamo Connector JAR file and for this local
> sample input file:
>
> ADD JAR
> '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
> SET 'execution.runtime-mode' = 'streaming';
> SET 'sql-client.execution.result-mode' = 'changelog';
>
> CREATE TABLE Orders_CDC(
>   orderId BIGINT,
>   price float,
>   userId STRING
>  ) WITH (
>'connector' = 'filesystem',
>'path' = '/path/to/input_file.jsonl',
>'format' = 'debezium-json'
>  );
>
> CREATE TABLE Orders_Dynamo (
>   orderId BIGINT,
>   price float,
>   userId STRING,
>   PRIMARY KEY (userId) NOT ENFORCED
> ) PARTITIONED BY ( userId )
> WITH (
>   'connector' = 'dynamodb',
>   'table-name' = 'orders',
>   'aws.region' = 'us-east-1'
> );
>
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
>
> (4) At this point, we will see that things currently all work properly,
> and these 4 rows are inserted properly to Dynamo, because they are "Insert"
> operations.   So far, so good!
>
> (5) Now, add the following row to the input file.  This represents a
> deletion in Debezium format, which should then cause a Deletion on the
> corresponding DynamoDB table:
> {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}
>
> (6) Re-Run the SQL statement:
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
>
> (7) You will now see a failure in the job with the error message "The
> provided key element does not match the schema".   This is happening
> because the Flink DynamoDB connector is attempting to issue a Delete
> request to Dynamo while passing all fields (orderId, price, and userId),
> when only the orderId should be passed in a delete request.
>
> I believe we can fix this by adjusting the DynamoDbSinkWriter.java to
> leverage the knowledge it has about partition keys (already present in the
> "overwriteByPartitionKeys" field) to have it only pass long the partition
> key fields to any Delete requests.   Does this sound like a good approach?
>
> If so, as I am new to this community, please let me know the proper
> procedure to get this flagged as an official issue within JIRA and then to
> get a Pull Request reviewed/approved, and I could start working on one.
>
> Thanks,
> Rob
>
>
>


Re: Flink kafka connector for v 1.19.0

2024-05-10 Thread Ahmed Hamdy
Hi Aniket

The community is currently working on releasing a new version for all the
connectors that is compatible with 1.19. Please follow the announcements in
Flink website[1] to get notified when it is available.

1-https://flink.apache.org/posts/
Best Regards
Ahmed Hamdy


On Fri, 10 May 2024 at 18:14, Aniket Sule 
wrote:

> Hello,
>
> On the Flink downloads page, the latest stable version is Flink 1.19.0.
> However, the Flink Kafka connector is v 3.1.0, that is compatible with
> 1.18.x.
>
> Is there a timeline when the Kafka connector for v 1.19 will be released?
> Is it possible to use the v3.1.0 connector with Flink v 1.19?
>
>
>
> Thanks and regards,
>
> Aniket Sule
> Caution: External email. Do not click or open attachments unless you know
> and trust the sender.
>


Re: Apache Flink-Redis Connector Depreciated In New Version | Adsolut Media

2024-05-09 Thread Ahmed Hamdy
Hi Kush
Unfortunately there is currently no real Redis connector maintained by the
Flink community. I am aware that Bahir's version might be outdated but we
are currently working on a community supported connector[1]

1-https://github.com/apache/flink-connector-redis-streams
Best Regards
Ahmed Hamdy


On Thu, 9 May 2024 at 12:30, Kush Rohra  wrote:

> Hello Team Apache Flink,
>
>
>
> Hope you are doing well.
>
>
>
> Introduction : My Name is Kush Rohra Co-Founder and CTO of Adsolut Media
> <https://adsolut.in/>. We are an Ad-Tech company serving almost 200+
> clients in ad-tech space by showing them “ads” and providing them real-time
> analytics platform to review the performance.
>
>
>
> My team is currently using Apache Flink v1.16 for data streaming where
> Apache Kafka is producing the data and Flink is consuming the same.
> Currently we are using two flink instances one is for Flink Mins data and
> another is for Flink Hours data. To show the real time graph of every mins
> we are consuming the data from Flink-Hour using the help of “Redis Cache”
> with the help of Flink-redis connector. However, with the newer version of
> flink I could see that flink-redis connector is deprecated.
>
>
>
> However, can you please help/guide us that if we have to store the flink
> data into caching which connector is more efficient and stable to use apart
> from Redis ?
>
>
>
> Please find attached architecture of our web-app.
>
>
>
> Looking forward to hear from you.
>
>
>
> Thanks
>
> Kush Rohra
>


Re: Suggestions for aggregating records to a Kinesis Sink, (or generic Async Sink)?

2024-04-29 Thread Ahmed Hamdy
Hi Michael,
Unfortunately the new `KinesisDataStreamsSink` doesn't support aggregation
yet.
My suggestion if you want to use native kinesis aggregation is to use the
latest connector version that supports KPL as sink for Table API, that
would be 1.14.x. you could package the connector of that version.

 > was thinking about moving the logic to use window functions (either in
the Table or DataStream API), but here I think I'd need to "close" the
window based not only on time, but also on record number

Regarding this approach, I believe a better way might be implementing a
custom process function to hold batches of records in state and emit an
aggregated record but this might not be consistent with KPL aggregation of
course and de-aggregated records could be not retrieved so I would advise
not to take this approach.


Best Regards
Ahmed Hamdy


On Mon, 29 Apr 2024 at 11:14, Michael Marino 
wrote:

> Hi all,
>
> We are currently using Flink 1.18.1 (AWS Managed Flink) and are writing to
> Kinesis streams in several of our applications using the Table API.
>
> In our use case, we would like to be able to aggregate multiple records
> (rows) together and emit them in a single Kinesis record.
>
> As far as I understand, with the usage of the Async Writer it is not
> possible to aggregate records (Table rows) together into a single record as
> was possible previously with the Kinesis Producer Library.
>
> I wanted to ask if anyone here has any suggestions of how to do this, or
> perhaps if I missed it somewhere in the documentation? I was thinking about
> moving the logic to use window functions (either in the Table or DataStream
> API), but here I think I'd need to "close" the window based not only on
> time, but also on record number. Anyways, any thoughts are appreciated!
>
> Thanks,
> Mike
>
> --
>
> Michael Marino
>
> Principal Data Science & Analytics
>
> Phone:  +49 89 7167786 - 14
>
> linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh>
> | facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado
> <http://www.twitter.com/tado> | youtube.com/tado
> <http://www.youtube.com/tado>
>
> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>
>  Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
> Schwarz | Josef Wenzl
>
> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
> 280012558
>


Re: Discussion thread : Proposal to add Conditions in Flink CRD's Status field

2024-03-27 Thread Ahmed Hamdy
I am sorry it should be "d...@flink.apache.org"
Best Regards
Ahmed Hamdy


On Wed, 27 Mar 2024 at 13:00, Ahmed Hamdy  wrote:

> Hi Lajith,
> Could you please open the discussion thread against "d...@apache.flink.org",
> I believe it is better suited there.
> Best Regards
> Ahmed Hamdy
>
>
> On Wed, 27 Mar 2024 at 05:33, Lajith Koova  wrote:
>
>> 
>>
>> Hello,
>>
>>
>> Starting discussion thread here to discuss a proposal to add Conditions
>> field in the CR status of Flink Deployment and FlinkSessionJob.
>>
>>
>> Here is the google doc with details. Please provide your thoughts/inputs.
>>
>>
>>
>>
>> https://docs.google.com/document/d/12wlJCL_Vq2KZnABzK7OR7gAd1jZMmo0MxgXQXqtWODs/edit?usp=sharing
>>
>>
>> Thanks
>>
>>


Re: Discussion thread : Proposal to add Conditions in Flink CRD's Status field

2024-03-27 Thread Ahmed Hamdy
Hi Lajith,
Could you please open the discussion thread against "d...@apache.flink.org",
I believe it is better suited there.
Best Regards
Ahmed Hamdy


On Wed, 27 Mar 2024 at 05:33, Lajith Koova  wrote:

> 
>
> Hello,
>
>
> Starting discussion thread here to discuss a proposal to add Conditions
> field in the CR status of Flink Deployment and FlinkSessionJob.
>
>
> Here is the google doc with details. Please provide your thoughts/inputs.
>
>
>
> https://docs.google.com/document/d/12wlJCL_Vq2KZnABzK7OR7gAd1jZMmo0MxgXQXqtWODs/edit?usp=sharing
>
>
> Thanks
>
>