Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-06 Thread Yingjie Cao
Hi all,

Thanks very much for all of the feedbacks. It seems that we have reached a
consensus. I will start a vote soon.

Best,
Yingjie

Yun Gao  于2022年1月5日周三 16:08写道:

> Very thanks @Yingjie for completing the experiments!
>
> Also +1 for changing the default config values. From the experiments,
> Changing the default config values would largely increase the open box
> experience of the flink batch, thus it seems worth changing from my side
> even if it would cause some compatibility issue under some scenarios. In
> addition, if we finally have to break compatibility, we might do it early
> to
> avoid affecting more users.
>
> Best,
> Yun
>
> --
> From:刘建刚 
> Send Time:2022 Jan. 4 (Tue.) 20:43
> To:user-zh 
> Cc:dev ; user 
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Thanks for the experiment. +1 for the changes.
>
> Yingjie Cao  于2022年1月4日周二 17:35写道:
>
> > Hi all,
> >
> > After running some tests with the proposed default value (
> > taskmanager.network.sort-shuffle.min-parallelism: 1,
> > taskmanager.network.sort-shuffle.min-buffers: 512,
> > taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> > taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> > share some test results.
> >
> > 1. TPC-DS performance and stability test (I the TPC-DS benchmark using
> 512
> > default parallelism and several different settings multiple times):
> > 1) Stability:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query stability a lot. With the current default, there
> > are many queries suffering from blocking shuffle relevant failures. With
> > the proposed default values, only three queries fail because of the
> > "Insufficient number of network buffers:" error. With 512 parallelism,
> the
> > current default configuration will incur the same issue. Part of the
> reason
> > is that the network buffer consumed by InputGate is  proportional to
> > parallelism and can use 32M network memory by default and many tasks has
> > several InputGate but we only has 128M network memory per TaskManager by
> > default.
> > 2) Performance:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query performance a lot. Except for those queries of
> > small shuffle data amount which consume really short time, the proposed
> > default values can bring 2-10 times performance gain. About the default
> > value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> > Yun, I tested both 1 and 128 and 1 is better for performance which is as
> > expected.
> >
> > 2. Flink pre-commit stability test:
> > I have run all Flink tests with the proposed default value for more than
> > 20 times. The only instability is the "Insufficient number of network
> > buffers:" error for batch several test cases. This error occurs because
> > some tests have really limited network buffers and the proposed default
> > config values may increase the network buffer consumption for cases.
> After
> > increase the total network size for these test cases, the issue can be
> > solved.
> >
> > Summary:
> > 1. The proposed default value can improve both the performance and
> > stability of Flink batch shuffle a lot.
> > 2. Some batch jobs may fail because of the "Insufficient number of
> network
> > buffers:" error for this default value change will increase the network
> > buffer consumption a little for jobs less than 512 parallelism (for jobs
> > more than 512 parallelism network buffer consumption will be reduced).
> > 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> > better performance than setting that to 128, both settings may incur the
> > "Insufficient number of network buffers:" error.
> > 4. After changing the default value and fixing several test cases, all
> > Flink tests (except for those known unstable cases) can run stably.
> >
> > Personally, I am +1 to make the change. Though the change may cause some
> > batch jobs fail because of the "Insufficient number of network buffers:",
> > the possibility is small enough (only 3 TPC-DS out of about 100 queries
> > fails, these queries will also fail with the current default
> configuration
> > because it is the InputGate which takes the most network buffers and cost
> > the error). Compared to this small regression, the performance and
> > stability gains are big. Any feedbacks especially those from Flink batch
> > users are highly appreciated.
> >
> > BTW, aside from the above tests, I also tries to tune some more config
> > options to try to make the TPC-DS test faster. I copied these tuned
> config
> > options from our daily TPC-DS settings. The results show that the
> optimized
> > configuration can improve the TPC-DS performance about 30%. Though these
> > settings may not the best, they really help compared to the default
> value.
> > I 

Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-06 Thread Yingjie Cao
Hi all,

Thanks very much for all of the feedbacks. It seems that we have reached a
consensus. I will start a vote soon.

Best,
Yingjie

Yun Gao  于2022年1月5日周三 16:08写道:

> Very thanks @Yingjie for completing the experiments!
>
> Also +1 for changing the default config values. From the experiments,
> Changing the default config values would largely increase the open box
> experience of the flink batch, thus it seems worth changing from my side
> even if it would cause some compatibility issue under some scenarios. In
> addition, if we finally have to break compatibility, we might do it early
> to
> avoid affecting more users.
>
> Best,
> Yun
>
> --
> From:刘建刚 
> Send Time:2022 Jan. 4 (Tue.) 20:43
> To:user-zh 
> Cc:dev ; user 
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Thanks for the experiment. +1 for the changes.
>
> Yingjie Cao  于2022年1月4日周二 17:35写道:
>
> > Hi all,
> >
> > After running some tests with the proposed default value (
> > taskmanager.network.sort-shuffle.min-parallelism: 1,
> > taskmanager.network.sort-shuffle.min-buffers: 512,
> > taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> > taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> > share some test results.
> >
> > 1. TPC-DS performance and stability test (I the TPC-DS benchmark using
> 512
> > default parallelism and several different settings multiple times):
> > 1) Stability:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query stability a lot. With the current default, there
> > are many queries suffering from blocking shuffle relevant failures. With
> > the proposed default values, only three queries fail because of the
> > "Insufficient number of network buffers:" error. With 512 parallelism,
> the
> > current default configuration will incur the same issue. Part of the
> reason
> > is that the network buffer consumed by InputGate is  proportional to
> > parallelism and can use 32M network memory by default and many tasks has
> > several InputGate but we only has 128M network memory per TaskManager by
> > default.
> > 2) Performance:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query performance a lot. Except for those queries of
> > small shuffle data amount which consume really short time, the proposed
> > default values can bring 2-10 times performance gain. About the default
> > value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> > Yun, I tested both 1 and 128 and 1 is better for performance which is as
> > expected.
> >
> > 2. Flink pre-commit stability test:
> > I have run all Flink tests with the proposed default value for more than
> > 20 times. The only instability is the "Insufficient number of network
> > buffers:" error for batch several test cases. This error occurs because
> > some tests have really limited network buffers and the proposed default
> > config values may increase the network buffer consumption for cases.
> After
> > increase the total network size for these test cases, the issue can be
> > solved.
> >
> > Summary:
> > 1. The proposed default value can improve both the performance and
> > stability of Flink batch shuffle a lot.
> > 2. Some batch jobs may fail because of the "Insufficient number of
> network
> > buffers:" error for this default value change will increase the network
> > buffer consumption a little for jobs less than 512 parallelism (for jobs
> > more than 512 parallelism network buffer consumption will be reduced).
> > 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> > better performance than setting that to 128, both settings may incur the
> > "Insufficient number of network buffers:" error.
> > 4. After changing the default value and fixing several test cases, all
> > Flink tests (except for those known unstable cases) can run stably.
> >
> > Personally, I am +1 to make the change. Though the change may cause some
> > batch jobs fail because of the "Insufficient number of network buffers:",
> > the possibility is small enough (only 3 TPC-DS out of about 100 queries
> > fails, these queries will also fail with the current default
> configuration
> > because it is the InputGate which takes the most network buffers and cost
> > the error). Compared to this small regression, the performance and
> > stability gains are big. Any feedbacks especially those from Flink batch
> > users are highly appreciated.
> >
> > BTW, aside from the above tests, I also tries to tune some more config
> > options to try to make the TPC-DS test faster. I copied these tuned
> config
> > options from our daily TPC-DS settings. The results show that the
> optimized
> > configuration can improve the TPC-DS performance about 30%. Though these
> > settings may not the best, they really help compared to the default
> value.
> > I 

Re: Flink versions release 1.14.3

2022-01-06 Thread Martijn Visser
Hi Deepti,

You can track the progress of Flink 1.14.3 by following this thread [1]
It should be released in the next few weeks.

Best regards,

Martijn

[1] https://lists.apache.org/thread/24v8bh3jww7c5bvfgov9cp5mb0wtj7tk

On Fri, 7 Jan 2022 at 07:30, Deepti Sharma S 
wrote:

> Hello Team,
>
>
>
> Can you please confirm, when we have Flink version release 1.14.3 which
> has Log4J version 2.17?
>
>
>
>
>
> Regards,
>
> Deepti Sharma
> * PMP® & ITIL*
>
>
>


Re: reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode

2022-01-06 Thread Martin
Hi,
typo: "I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode."-> I run it of course via standalone kubernetes deployment, to make elastic scaling possible.
BRMartin
 

mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00):

Hi,
I have a job where I do a keyBy'd process function (a), then on the downstream a normal process function (b) and and then I use reinterpretAsKeyedStream to have yet another keyBy'd process function (c).
The last keyed process function use keyed state for a increasing sequence number.
I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode.
I use Flink 1.14.2.
I test that job on four use cases: (1) static parallelism, (2) scale out, (3) scale-in, (4) task manager file*.
* via kill -SIGTERM inside the container for the flink JVM
For test use cases (1) and (2) everything is fine.
For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.
Is the reinterpretAsKeyedStream feature in general usable with Elastic Scaling in Reactive Mode in Flink 1.14.2?
If yes, already any ideas what the root cause could be?
 
BRMartin
 



Flink versions release 1.14.3

2022-01-06 Thread Deepti Sharma S
Hello Team,

Can you please confirm, when we have Flink version release 1.14.3 which has 
Log4J version 2.17?


Regards,
Deepti Sharma
PMP(r) & ITIL




Flink upgrade and rollback query

2022-01-06 Thread Deepti Sharma S
Hello Team,

 

We are using Flink 1.13.2 and we have provided Helm Chart to deploy Flink on
K8S environment.

 

What are the recommended procedures to upgrade and rollback Flink which
means If we want to upgrade Flink from 1.13.2 to 1.14.x

 

We are looking for procedures with minimal or zero down time.

 

 

 

Regards,

Deepti Sharma 
PMPR & ITIL 



 



smime.p7s
Description: S/MIME cryptographic signature


Re: Trigger the producer to send data to the consumer after mentioned seconds

2022-01-06 Thread Flink Lover
I tried Flink version 1.14.2 / 1.13.5

On Fri, Jan 7, 2022 at 11:46 AM Flink Lover  wrote:

> Also, I am using flink-connector-kafka_2.11
>
> val consumer = new FlinkKafkaConsumer[String]("topic_name", new
> SimpleStringSchema(), properties)
>
>
> val myProducer = new FlinkKafkaProducer[String](
>   "topic_name", // target topic
>   new KeyedSerializationSchemaWrapper[String](new
> SimpleStringSchema()), // serialization schema
>   getProperties(), // producer config
>   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>
>
>
> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover 
> wrote:
>
>> Hi All,
>>
>> I checked the Java version using the java -version on the terminal and it
>> gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
>> which is by default.
>>
>> [image: image.png]
>>
>> What do you mean by target jvm? Also, what I am trying to achieve is
>> correct? about the windows?
>>
>> Thanks,
>> Martin
>>
>> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren  wrote:
>>
>>> Hi Martin,
>>>
>>> Can you provide the configuration of your Kafka producer and consumer?
>>> Also it’ll be helpful to have the complete code of your DataStream.
>>>
>>> About the error you mentioned, I doubt that the JDK version you actually
>>> use is probably below 1.8. Can you have a double check on the environment
>>> that your job is running in?
>>>
>>> Cheers,
>>>
>>> Qingsheng Ren
>>>
>>>
>>> > On Jan 7, 2022, at 1:13 AM, Flink Lover 
>>> wrote:
>>> >
>>> > Hello Folks!
>>> >
>>> > I have a DataStream which sends data to the consumer but I got the
>>> data once the code completed its execution. I didn't receive the records as
>>> the code was writing it to the topic. I was able to achieve this behavior
>>> using AT_LEAST_ONCE property but I decided to implement Watermarks. I
>>> haven't enabled checkpointing as of now. I know checkpointing will also do
>>> the trick.  My expectation is Producer should batch the records of 2
>>> seconds and send it to the consumer and consumer should receive a batch of
>>> 2 seconds. My code goes as below:
>>> >
>>> > Producer Side:
>>> >  dataToKafka.assignTimestampsAndWatermarks(
>>> >   WatermarkStrategy
>>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>> > dataToKafka.addSink(myProducer).uid("source")
>>> >
>>> > Consumer Side:
>>> > consumer.assignTimestampsAndWatermarks(
>>> >   WatermarkStrategy
>>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>> >
>>> > Now this gives me an error as below:
>>> >
>>> > Static methods in interface require -target:jvm-1.8
>>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>> >
>>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>>> >
>>> > Thanks,
>>> > Martin.
>>> >
>>> >
>>> >
>>>
>>>


Re: Trigger the producer to send data to the consumer after mentioned seconds

2022-01-06 Thread Flink Lover
Also, I am using flink-connector-kafka_2.11

val consumer = new FlinkKafkaConsumer[String]("topic_name", new
SimpleStringSchema(), properties)


val myProducer = new FlinkKafkaProducer[String](
  "topic_name", // target topic
  new KeyedSerializationSchemaWrapper[String](new
SimpleStringSchema()), // serialization schema
  getProperties(), // producer config
  FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)



On Fri, Jan 7, 2022 at 11:43 AM Flink Lover  wrote:

> Hi All,
>
> I checked the Java version using the java -version on the terminal and it
> gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
> which is by default.
>
> [image: image.png]
>
> What do you mean by target jvm? Also, what I am trying to achieve is
> correct? about the windows?
>
> Thanks,
> Martin
>
> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren  wrote:
>
>> Hi Martin,
>>
>> Can you provide the configuration of your Kafka producer and consumer?
>> Also it’ll be helpful to have the complete code of your DataStream.
>>
>> About the error you mentioned, I doubt that the JDK version you actually
>> use is probably below 1.8. Can you have a double check on the environment
>> that your job is running in?
>>
>> Cheers,
>>
>> Qingsheng Ren
>>
>>
>> > On Jan 7, 2022, at 1:13 AM, Flink Lover  wrote:
>> >
>> > Hello Folks!
>> >
>> > I have a DataStream which sends data to the consumer but I got the data
>> once the code completed its execution. I didn't receive the records as the
>> code was writing it to the topic. I was able to achieve this behavior using
>> AT_LEAST_ONCE property but I decided to implement Watermarks. I haven't
>> enabled checkpointing as of now. I know checkpointing will also do the
>> trick.  My expectation is Producer should batch the records of 2 seconds
>> and send it to the consumer and consumer should receive a batch of 2
>> seconds. My code goes as below:
>> >
>> > Producer Side:
>> >  dataToKafka.assignTimestampsAndWatermarks(
>> >   WatermarkStrategy
>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>> > dataToKafka.addSink(myProducer).uid("source")
>> >
>> > Consumer Side:
>> > consumer.assignTimestampsAndWatermarks(
>> >   WatermarkStrategy
>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>> >
>> > Now this gives me an error as below:
>> >
>> > Static methods in interface require -target:jvm-1.8
>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>> >
>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>> >
>> > Thanks,
>> > Martin.
>> >
>> >
>> >
>>
>>


Re: Trigger the producer to send data to the consumer after mentioned seconds

2022-01-06 Thread Flink Lover
Hi All,

I checked the Java version using the java -version on the terminal and it
gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
which is by default.

[image: image.png]

What do you mean by target jvm? Also, what I am trying to achieve is
correct? about the windows?

Thanks,
Martin

On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren  wrote:

> Hi Martin,
>
> Can you provide the configuration of your Kafka producer and consumer?
> Also it’ll be helpful to have the complete code of your DataStream.
>
> About the error you mentioned, I doubt that the JDK version you actually
> use is probably below 1.8. Can you have a double check on the environment
> that your job is running in?
>
> Cheers,
>
> Qingsheng Ren
>
>
> > On Jan 7, 2022, at 1:13 AM, Flink Lover  wrote:
> >
> > Hello Folks!
> >
> > I have a DataStream which sends data to the consumer but I got the data
> once the code completed its execution. I didn't receive the records as the
> code was writing it to the topic. I was able to achieve this behavior using
> AT_LEAST_ONCE property but I decided to implement Watermarks. I haven't
> enabled checkpointing as of now. I know checkpointing will also do the
> trick.  My expectation is Producer should batch the records of 2 seconds
> and send it to the consumer and consumer should receive a batch of 2
> seconds. My code goes as below:
> >
> > Producer Side:
> >  dataToKafka.assignTimestampsAndWatermarks(
> >   WatermarkStrategy
> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
> > dataToKafka.addSink(myProducer).uid("source")
> >
> > Consumer Side:
> > consumer.assignTimestampsAndWatermarks(
> >   WatermarkStrategy
> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
> >
> > Now this gives me an error as below:
> >
> > Static methods in interface require -target:jvm-1.8
> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
> >
> > My scala version is 2.11.12 and Java JDK 1.8.0.281
> >
> > Thanks,
> > Martin.
> >
> >
> >
>
>


Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-06 Thread Siddhesh Kalgaonkar
Hi Francis,

What I am trying to do is you can see over here
https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375?noredirect=1#comment124796734_70593375


On Fri, Jan 7, 2022 at 5:07 AM Francis Conroy 
wrote:

> Hi Siddhesh,
>
> How are you getting this tuple of strings into the system? I think this is
> the important question, you can create a DataStream in many ways, from a
> collection, from a source, etc but all of these rely on the
> ExecutionEnvironment you're using.
> A RichMapFunction doesn't produce a datastream directly, it's used in the
> context of the StreamExecutionEnvironment to create a stream i.e.
> DataStream.map([YourRichMapFunction]) this implies that you already need a
> datastream to transform a datastream using a mapFunction
> (MapFunction/RichMapFunction)
> Francis
>
> On Fri, 7 Jan 2022 at 01:48, Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> Hi,
>>
>> As I am new and I am facing one issue so I came across RichMapFunction.
>> How can I use RichMapFunction to convert a tuple of strings to datastream?
>> If not how can I do it apart from using StreamExecutionEnvironment?
>>
>> Thanks,
>> Sid
>>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


RowType for complex types in Parquet File

2022-01-06 Thread Meghajit Mazumdar
Hello,

Flink documentation mentions this

as to how to create a FileSource for reading Parquet files.
For primitive parquet types like BINARY and BOOLEAN, I am able to create a
RowType and read the fields.

However, I have some nested fields in my parquet schema also like this
which I want to read :

  optional group location = 11 {
optional double latitude = 1;
optional double longitude = 2;
  }

How can I create a RowType for this ? I did something like this below, but
I got an exception `Caused by: java.lang.UnsupportedOperationException:
Complex types not supported`

RowType nestedRowType = RowType.of(new LogicalType[] {new
DoubleType(), new DoubleType()}, new String[]{"latitude", "longitude"});
final LogicalType[] fieldTypes = new
LogicalType[]{nestedRowType};
final ParquetColumnarRowInputFormat format =
new ParquetColumnarRowInputFormat<>(
new Configuration(),
RowType.of(fieldTypes, new
String[]{"location"}),
500,
false,
true);


pyflink mixed with Java operators

2022-01-06 Thread Francis Conroy
Hi all,

Does anyone know if it's possible to specify a java map function at some
intermediate point in a pyflink job? In this case

SimpleCountMeasurementsPerUUID

is a flink java MapFunction. The reason we want to do this is that
performance in pyflink seems quite poor.
e.g.

import logging
import os
import sys
import zlib

import Measurements_pb2
from pyflink.common import Types
from pyflink.common.serialization import
KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment,
RuntimeExecutionMode, MapFunction, RuntimeContext, \
CheckpointingMode
from pyflink.datastream.connectors import RMQConnectionConfig,
RMQSource, KafkaSink

from functions.common import KeyByUUID
from functions.file_lister import auto_load_python_files
from customisations.serialisation import ZlibDeserializationSchema


class ZlibDecompressor(MapFunction):
def map(self, value):
decomp = zlib.decompress(value[1])
return value[0], decomp


class MeasurementSnapshotCountMapFunction(MapFunction):
def map(self, value):
pb_body = Measurements_pb2.MeasurementSnapshot()
pb_body.ParseFromString(value)
meas_count = len(pb_body.measurements)
if meas_count > 0:
first_measurement = pb_body.measurements[0]
point_uuid = first_measurement.point_uuid.value
timestamp = first_measurement.time

return timestamp, point_uuid, meas_count

return None


def word_count():
env = StreamExecutionEnvironment.get_execution_environment()
jarpath = 
f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
env.add_jars(jarpath)
auto_load_python_files(env)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# write all the data to one file
env.set_parallelism(1)
env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)

connection_config = RMQConnectionConfig.Builder() \
.set_host("rabbitmq") \
.set_port(5672) \
.set_virtual_host("/") \
.set_user_name("guest") \
.set_password("guest") \
.set_connection_timeout(60) \
.set_prefetch_count(5000) \
.build()

deserialization_schema = ZlibDeserializationSchema()

stream = env.add_source(RMQSource(
connection_config,
"flink-test",
False,
deserialization_schema,
)).set_parallelism(1)

# compute word count
dstream = 
stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData")
\
.key_by(KeyByUUID(), key_type=Types.STRING()) \
.jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID")
 # Hypothetical

kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
.set_value_serialization_schema(SimpleStringSchema()) \
.set_topic("flink-test-kafka") \
.build()

dstream.sink_to(
KafkaSink.builder() \
.set_record_serializer(kafka_serialisation_schema) \
.set_bootstrap_servers("kafka:9092") \
.build()
)

# submit for execution
env.execute()


if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
word_count()

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: Flink native k8s integration vs. operator

2022-01-06 Thread Xintong Song
Hi folks,

Thanks for the discussion. I'd like to share my two cents on this topic.

Firstly, I'd like to clarify my understanding of the concepts "native k8s
integration" and "active resource management".
- Native k8s integration means Flink's master interacts with k8s' api
server directly. It acts like embedding an operator inside Flink's master,
which manages the resources (pod, deployment, configmap, etc.) and watches
/ reacts to related events.
- Active resource management means Flink can actively start / terminate
workers as needed. Its key characteristic is that the resource a Flink
deployment uses is decided by the job's execution plan, unlike the opposite
reactive mode (resource available to the deployment decides the execution
plan) or the standalone mode (both execution plan and deployment resources
are predefined).

Currently, we have the yarn and native k8s deployments (and the recently
removed mesos deployment) in active mode, due to their ability to request /
release worker resources from the underlying cluster. And all the existing
operators, AFAIK, work with a Flink standalone deployment, where Flink
cannot request / release resources by itself.

>From this perspective, I think a large part of the native k8s integration
advantages come from the active mode: being able to better understand the
job's resource requirements and adjust the deployment resource accordingly.
Both fine-grained resource management (customizing TM resources for
different tasks / operators) and adaptive batch scheduler (rescale the
deployment w.r.t. different stages) fall into this category.

I'm wondering if we can have an operator that also works with the active
mode. Instead of talking to the api server directly for adding / deleting
resources, Flink's active resource manager can talk to the operator (via
CR) about the resources the deployment needs, and let the operator to
actually add / remove the resources. The operator should be able to work
with (active) or without (standalone) the information of deployment's
resource requirements. In this way, users are free to choose between active
and reactive (e.g., HPA) rescaling, while always benefiting from the
beyond-deployment lifecycle (upgrades, savepoint management, etc.) and
alignment with the K8s ecosystem (Flink client free, operating via kubectl,
etc.).

Thank you~

Xintong Song



On Thu, Jan 6, 2022 at 1:06 PM Thomas Weise  wrote:

> Hi David,
>
> Thank you for the reply and context!
>
> As for workload types and where native integration might fit: I think
> that any k8s native solution that satisfies category 3) can also take
> care of 1) and 2) while the native integration by itself can't achieve
> that. Existence of [1] might serve as further indication.
>
> The k8s operator pattern would be an essential building block for a
> k8s native solution that is interoperable with k8s ecosystem tooling
> like kubectl, which is why [2] and subsequent derived art were
> created. Specifically the CRD allows us to directly express the
> concept of a Flink application consisting of job manager and task
> manager pods along with associated create/update/delete operations.
>
> Would it make sense to gauge interest to have such an operator as part
> of Flink? It appears so from discussions like [3]. I think such
> addition would significantly lower the barrier to adoption, since like
> you mentioned one cannot really run mission critical streaming
> workloads with just the Apache Flink release binaries alone. While it
> is great to have multiple k8s operators to choose from that are
> managed outside Flink, it is unfortunately also evident that today's
> hot operator turns into tomorrow's tech debt. I think such fate would
> be less likely within the project, when multiple parties can join
> forces and benefit from each other's contributions. There were similar
> considerations and discussions around Docker images in the past.
>
> Out of the features that you listed it is particularly the application
> upgrade that needs to be solved through an external process like
> operator. The good thing is that many folks have already thought hard
> about this and in existing implementations we see different strategies
> that have their merit and production mileage (certainly applies to
> [2]). We could combine the best of these ideas into a unified
> implementation as part of Flink itself as starting point.
>
> Cheers,
> Thomas
>
>
> [1] https://github.com/wangyang0918/flink-native-k8s-operator
> [2] https://github.com/lyft/flinkk8soperator
> [3] https://lists.apache.org/thread/fhcr5gj1txcr0fo4s821jkp6d4tk6080
>
>
> On Tue, Jan 4, 2022 at 4:04 AM David Morávek  wrote:
> >
> > Hi Thomas,
> >
> > AFAIK there are no specific plans in this direction with the native
> integration, but I'd like to share some thoughts on the topic
> >
> > In my understanding there are three major groups of workloads in Flink:
> >
> > 1) Batch workloads
> > 2) Interactive workloads (Both Batch and 

Re: Exact-once processing when a job fails

2022-01-06 Thread Sharon Xie
Thanks again the for the detailed info. It makes a lot of sense.

One last question, can me create a checkpoint as soon as a job starts? In
this case, the first record’s offset will be in the checkpoint state and
this will provide “strong” guarantee as you said. Did I miss anything? I
read the code and realized that checkpointing happens in a separate thread
that’s why there isn’t the “initial” checkpoint now. But I want to know if
my idea would work in theory.

On Wed, Jan 5, 2022 at 11:36 PM Caizhi Weng  wrote:

> Hi!
>
> Also note that although this eventual consistency seems not good enough,
> but for 99.99% of the time the job can run smoothly without failure. In
> this case the records are correct and good. Only in the 0.01% case when the
> job fails will user see inconsistency for a small period of time (for a
> checkpoint interval). If the user can tolerate this 0.01% chance of
> inconsistency then he can get this very low latency and mostly correct data.
>
> Caizhi Weng  于2022年1月6日周四 10:40写道:
>
>> Hi!
>>
>> Flink guarantees *eventual* consistency for systems without transactions
>> (by transaction I mean a system supporting writing a few records then
>> commit), or with transactions but users prefer latency than consistency.
>> That is to say, everything produced by Flink before a checkpoint is "not
>> secured" (if you need *strong* consistency). If a failure happens after
>> a checkpoint C, then everything produced by Flink after C should be
>> ignored. Only the success of C + 1 guarantees that all these records are
>> now consistent (up to C + 1).
>>
>> If you prefer strong consistency and can tolerate latency of a few
>> minutes (the latency here is your checkpoint interval), you can try hive
>> sink. When writing to hive sink in a streaming job, records are only
>> visible in hive after the next checkpoint completes (so it is ok to process
>> a record several times, as long as its corresponding checkpoint hasn't
>> completed).
>>
>> group-offsets does not help in your case. There actually is an option to
>> commit offsets to Kafka during each checkpoint but Flink will also manage
>> offsets in its own state. If there is no checkpoint then group offsets
>> won't change.
>>
>> Sharon Xie  于2022年1月5日周三 13:18写道:
>>
>>> Hi Caizhi,
>>>
>>> Thank you for the quick response. Can you help me understand how
>>> reprocessing the data with the earliest starting-offset ensures exactly
>>> once processing? 1st, the earliest offset could be way beyond the 1st
>>> record in my example since the first time the job started from the latest
>>> offset. 2nd, even if there are only two records in the topic, the 1st
>>> record was already processed before so I'd think the 1st record would be
>>> processed twice if the earliest offset is used.
>>>
>>> Another thing I found from the doc start reading position
>>> 
>>>
>>> >The default option value is group-offsets which indicates to consume
>>> from last committed offsets in ZK / Kafka brokers.
>>>
>>> It seems that there is a way to resume processing from the
>>> "group-offsets" where its value would be the offset of the 1st record in my
>>> scenario. However, I can't make it work based on my test. I'm using
>>> application mode deployment so my guess is that the 2nd job (in a new
>>> cluster) internally has a different kafka consumer group id. Any ideas to
>>> make it work?
>>>
>>>
>>> Thanks,
>>> Sharon
>>>
>>> On Tue, Jan 4, 2022 at 6:06 PM Caizhi Weng  wrote:
>>>
 Hi!

 This is a valid case. This starting-offset is the offset for Kafka
 source to read from when the job starts *without checkpoint*. That is
 to say, if your job has been running for a while, completed several
 checkpoints and then restarted, Kafka source won't read from
 starting-offset, but from the offset logged in the checkpoint.

 As you're only processing 2 records (which I guess takes less than a
 second) no checkpoint has been completed (if you didn't change any
 configurations the default checkpointing interval is 10 minutes), so the
 next time you start the same job it will still read from starting-offset,
 which is the latest offset by your setting.

 If you would like to reprocess the second record you can set the
 starting-offset to earliest (which is the default setting). The first
 record will also be reprocessed but this is still valid because it is just
 updating the result for the first record (which is the same as your
 previous execution).

 Sharon Xie  于2022年1月5日周三 02:56写道:

> Can someone help me understand how Flink deals with the following
> scenario?
>
> I have a job that reads from a source Kafka (starting-offset: latest)
> and writes to a sink Kafka with exactly-once execution. Let's say that I
> have 2 records in the source. The 1st one is processed 

Re:Re: 咨询个Flink SQL的问题,如何去除null的字段

2022-01-06 Thread RS
Hi,
感谢回复,我也测试过这类方法,
我给json format加了个参数,在序列化的时候,row里面去除null,但是这个要修改代码,单独更新flink-json的jar包了,后期维护可能会有问题
这种很适合写ES和写文件,不会有冗余的字段
如果社区能新增这个功能或者合并进去就方便了

在 2022-01-06 21:18:37,"Benchao Li"  写道:
>我们内部是给json format加了一个功能,允许不把null字段进行序列化。主要解决的也是es这个写入的场景。
>你们也可以试一下。
>
>RS  于2021年12月29日周三 16:41写道:
>
>> Hi,
>> 使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
>>
>>
>> 比如:源数据有3个字段,a,b,c
>> insert into table2
>> select
>> a,b,c
>> from table1
>> 当b=null的时候,只希望写入a和c
>> 当c=null的时候,只希望写入a和b
>>
>>
>
>-- 
>
>Best,
>Benchao Li


Re: Re: Re: flink sql回撤流sink优化问题

2022-01-06 Thread Benchao Li
mini-batch对aggregate算子是有效的,开启了之后它的输出会降低一些,从而降低了sink的输出压力。

casel.chen  于2022年1月7日周五 07:42写道:

> mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-01-06 20:43:00,"Benchao Li"  写道:
> >这个问题可以用mini-batch[1]来解决呀
> >
> >[1]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation
> >
> >casel.chen  于2021年12月26日周日 18:01写道:
> >
> >> 你说的是upsert-kafka的这两个参数吗?
> >>
> >> sink.buffer-flush.max-rows
> >> sink.buffer-flush.interval
> >> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
> >> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-12-25 22:54:19,"郭伟权"  写道:
> >>
> >>
> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
> >> >
> >> >casel.chen  于2021年12月23日周四 08:15写道:
> >> >
> >> >> flink sql中aggregate without
> >> >>
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> >> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> >> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
> >> >>
> >> >>
> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
> >> >>
> >> >> orderid.   categorydt
> amt
> >> >>
> >> >> 订单id 商品类型   购买时间(MMddHH)  购买金额
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
> >> >>
> >> >>
> >> >>
> >> >> INSERT INTO mysql_sink_table
> >> >>
> >> >> SELECT category, dt, LAST_VALUE(total)
> >> >>
> >> >> OVER (
> >> >>
> >> >>   PARTITION BY category
> >> >>
> >> >>   ORDER BY PROCTIME()
> >> >>
> >> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
> >> >>
> >> >> ) AS var1
> >> >>
> >> >> FROM (
> >> >>
> >> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category,
> dt
> >> >>
> >> >> );
> >>
> >
> >
> >--
> >
> >Best,
> >Benchao Li
>


-- 

Best,
Benchao Li


Re:Re: Re: flink sql回撤流sink优化问题

2022-01-06 Thread casel.chen
mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。

















在 2022-01-06 20:43:00,"Benchao Li"  写道:
>这个问题可以用mini-batch[1]来解决呀
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation
>
>casel.chen  于2021年12月26日周日 18:01写道:
>
>> 你说的是upsert-kafka的这两个参数吗?
>>
>> sink.buffer-flush.max-rows
>> sink.buffer-flush.interval
>> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
>> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2021-12-25 22:54:19,"郭伟权"  写道:
>>
>> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
>> >
>> >casel.chen  于2021年12月23日周四 08:15写道:
>> >
>> >> flink sql中aggregate without
>> >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
>> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
>> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>> >>
>> >>
>> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>> >>
>> >> orderid.   categorydt  amt
>> >>
>> >> 订单id 商品类型   购买时间(MMddHH)  购买金额
>> >>
>> >>
>> >>
>> >>
>> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>> >>
>> >>
>> >>
>> >> INSERT INTO mysql_sink_table
>> >>
>> >> SELECT category, dt, LAST_VALUE(total)
>> >>
>> >> OVER (
>> >>
>> >>   PARTITION BY category
>> >>
>> >>   ORDER BY PROCTIME()
>> >>
>> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>> >>
>> >> ) AS var1
>> >>
>> >> FROM (
>> >>
>> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>> >>
>> >> );
>>
>
>
>-- 
>
>Best,
>Benchao Li


reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode

2022-01-06 Thread martin
Hi,
I have a job where I do a keyBy'd process function (a), then on the downstream a normal process function (b) and and then I use reinterpretAsKeyedStream to have yet another keyBy'd process function (c).
The last keyed process function use keyed state for a increasing sequence number.
I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode.
I use Flink 1.14.2.
I test that job on four use cases: (1) static parallelism, (2) scale out, (3) scale-in, (4) task manager file*.
* via kill -SIGTERM inside the container for the flink JVM
For test use cases (1) and (2) everything is fine.
For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.
Is the reinterpretAsKeyedStream feature in general usable with Elastic Scaling in Reactive Mode in Flink 1.14.2?
If yes, already any ideas what the root cause could be?
 
BRMartin
 


Flink rest api to start a job

2022-01-06 Thread Qihua Yang
Hi,

I found a weird behavior. We launched a k8s cluster without job. But
includes the jar A. I use Flink rest api to upload a dummy jar(actually it
can be any jar). Flink will create a jar id. Then I use rest api to start
the job with the jar A entry-class. But the jar id is the dummy jar id.
Flink will start the job from jar A. Anyone know why?
My understanding is flink rest api should start the job from the dummy jar,
because jar id is dummy jar id that I uploaded.
Here are steps what I did:
1. deploy a k8s pod contains working jar(testA.jar)
1. flink rest api upload jar, testB.jar, flink generate jar id,
2d6a9263-c9d3-4f23-9f59-fc3594aadf0c_job.jar
2. flink rest api to runJar with testB.jar id, but testA.jar entry-class.
3. flink start job from testA.jar

Thanks,
Qihua


Trigger the producer to send data to the consumer after mentioned seconds

2022-01-06 Thread Flink Lover
Hello Folks!

I have a DataStream which sends data to the consumer but I got the data
once the code completed its execution. I didn't receive the records as the
code was writing it to the topic. I was able to achieve this behavior using
AT_LEAST_ONCE property but I decided to implement Watermarks. I haven't
enabled checkpointing as of now. I know checkpointing will also do the
trick.  My expectation is Producer should batch the records of 2 seconds
and send it to the consumer and consumer should receive a batch of 2
seconds. My code goes as below:

*Producer Side*:
 dataToKafka.assignTimestampsAndWatermarks(
  WatermarkStrategy
.forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
dataToKafka.addSink(myProducer).uid("source")

*Consumer Side*:
consumer.assignTimestampsAndWatermarks(
  WatermarkStrategy
.forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))

Now this gives me an error as below:

Static methods in interface require -target:jvm-1.8
.forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))

My scala version is 2.11.12 and Java JDK 1.8.0.281

Thanks,
Martin.


Moving off of TypeInformation in Flink 1.11

2022-01-06 Thread Sofya T. Irwin
Hi,

I’m moving my Flink 1.11 application onto the Blink Table Planner; and off
 of TypeInformation and onto DataTypes in preparation for upgrading Flink
 to Flink 1.13 or higher.

I’m having trouble moving off of TypeInformation.

Specifically I have a section of code that maps a DataStream[Message] to a
DataStream[Row]:

  implicit val typeInformation: TypeInformation[Row] =
 myObject.getProducedType
  val resultStream: DataStream[Row] = dataStream.map(msg =>
myTransform(msg))

Note that myTransform() takes in a Message object and returns a Row object.
Message is an internal class that we are using.
The resultStream:DataStream[Row] is passed as a StreamTableSource[Row]
later.

If I comment out the implicit val above, I get a failure:

  TableSource of type com.MyTableSourceFromDataStream returned a DataStream
of data type
  GenericType that does not match with the data
type
  ROW<`my_field_1` INT NOT NULL, ... `my_other_field` BIGINT> declared by
the
  TableSource.getProducedDataType() method. Please validate the
implementation of the TableSource.

I checked the Flink 1.11.4, Flink 1.13, and most recent sources and it
seems that the implementation of DataStream.map() is not changed and still
uses TypeInformation.

https://github.com/apache/flink/blob/master/flink
-streaming-scala/src/main/scala/org/apache/flink
/streaming/api/scala/DataStream.scala#L657

Based on the code above it seems that the issue is that Flink's
DataStream.map function uses TypeInformation.

I’m not sure if there’s an equivalent DataType implicit that I should be
declaring instead. Or if I should be using some function other than map

Do you have any suggestions for how to proceed? I'd like to completely move
off of TypeInformation in my app.

Thanks,
Sofya


RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-06 Thread Siddhesh Kalgaonkar
Hi,

As I am new and I am facing one issue so I came across RichMapFunction. How
can I use RichMapFunction to convert a tuple of strings to datastream? If
not how can I do it apart from using StreamExecutionEnvironment?

Thanks,
Sid


adding elapsed times to events that form a transaction

2022-01-06 Thread HG
Hello all,

My question is basically whether it is possible to group events by a key
(these will belong to a specific transaction) and then calculate the
elapsed times between them based on a timestamp that is present in the
event.
So a transaction my have x events all timestamped and with the
transaction_id as key.
Is it possible to
1. group them by the key
2. order by the timestamp,
3. calculate the elapsed times between the steps/event
4. add that elapsed time to the step/event
5. output the modified events to the sink



Regards Hans


Re: Passing msg and record to the process function

2022-01-06 Thread Siddhesh Kalgaonkar
I was able to modify the code and get the tuple in case of Success. How do
I pass the tuple to the Failure part?

try
{
  //
  //some processing

if (!validationMessages.isEmpty) {
(parsedJson.toString(), validationMessages.foreach(x => {
  val msg: String = x.getMessage
  msg
}).toString())
  }
  else {
(parsedJson.toString(), "Good Record...")
  }

}
match {
  case Success(x) => {
Right(x)
  }
  case Failure(err) => {
Left(json)
  }
}


On Thu, Jan 6, 2022 at 1:43 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> Thanks, Caizhi for your explanation. It helped me to understand where I
> went wrong.
>
> On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng  wrote:
>
>> Hi!
>>
>> The last expression in your try block is
>>
>> if(validationMessages.isEmpty) {
>>   (parsedJson.toString(),
>> validationMessages.foreach((msg=>msg.getMessage.toString)))
>> } else {
>>   (parsedJson.toString(), "Format is correct...")
>> }
>>
>> The first one produces a (String, Unit) type while the second one
>> produces a (String, String) type, so the whole if expression produces
>> (String, Any) type. However your parseJson should return Either[String,
>> String], thus causing this issue.
>>
>>
>> Siddhesh Kalgaonkar  于2022年1月5日周三 19:04写道:
>>
>>> I have written a process function where I am parsing the JSON and if it
>>> is not according to the expected format it passes as Failure to the process
>>> function and I print the records which are working fine. Now, I was trying
>>> to print the message and the record in case of Success and Failure. I
>>> implemented the below code and it gave me the error. What exactly I am
>>> missing?
>>>
>>> package KafkaAsSource
>>>
>>> import com.fasterxml.jackson.databind.ObjectMapper
>>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
>>> import org.apache.flink.api.scala.createTypeInformation
>>> import org.apache.flink.streaming.api.functions.ProcessFunction
>>> import org.apache.flink.streaming.api.scala.OutputTag
>>> import org.apache.flink.util.Collector
>>> import scala.jdk.CollectionConverters._
>>> import scala.util.{Failure, Success, Try}
>>>
>>> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>>>   val outputTag = new OutputTag[String]("failed")
>>>
>>>   def parseJson(json: String): Either[String, String] = {
>>> val schemaJsonString =
>>>   """
>>> {
>>> "$schema": "http://json-schema.org/draft-04/schema#;,
>>> "title": "Product",
>>> "description": "A product from the catalog",
>>> "type": "object",
>>> "properties": {
>>> "id": {
>>> "description": "The unique identifier for a product",
>>> "type": "integer"
>>> },
>>> "premium": {
>>> "description": "Annual Premium",
>>> "type": "integer"
>>> },
>>> "eventTime": {
>>> "description": "Timestamp at which record has arrived at source 
>>> / generated",
>>> "type": "string"
>>> }
>>> },
>>> "required": ["id", "premium","eventTime"]
>>> }
>>> """
>>> Try {
>>>   val schema = 
>>> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>>>   // You can read a JSON object from String, a file, URL, etc.
>>>   val parsedJson = new ObjectMapper().readTree(json)
>>>   val validationMessages = schema.validate(parsedJson).asScala
>>>   //validationMessages.foreach(msg => println(msg.getMessage))
>>>   require(validationMessages.isEmpty)
>>>   //parsedJson.toString()
>>>   if(validationMessages.isEmpty)
>>> {
>>>   
>>> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>>> }
>>>   else
>>> {
>>>   (parsedJson.toString(),"Format is correct...")
>>> }
>>>
>>> }
>>> match {
>>>   case Success(x) => {
>>> println("Good: " + x)
>>> Right(x)
>>>   }
>>>   case Failure(err) => {
>>> println("Bad:  " + json)
>>> Left(json)
>>>   }
>>> }
>>>   }
>>>   override def processElement(i: String, context: ProcessFunction[String, 
>>> String]#Context, collector: Collector[String]): Unit = {
>>> parseJson(i) match {
>>>   case Right(data) => {
>>> collector.collect(data)
>>> println("Good Records: " + data)
>>>   }
>>>   case Left(json) => {
>>> context.output(outputTag, json)
>>> println("Bad Records: " + json)
>>>   }
>>> }
>>>   }
>>> }
>>>
>>>
>>> Error:
>>>
>>> type mismatch;
>>>  found   : (String, Any)
>>>  required: String
>>> Right(x)
>>>
>>>


Re: CVE-2021-44228 - Log4j2 vulnerability

2022-01-06 Thread Martijn Visser
Hi all,

The ticket for upgrading Log4J to 2.17.0 is
https://issues.apache.org/jira/browse/FLINK-25375. There's also the update
to Log4j 2.17.1 which is tracked under
https://issues.apache.org/jira/browse/FLINK-25472

As you can see, both have a fix version set to 1.14.3 and 1.13.6. These
versions haven't been released yet. Flink 1.14.3 is in preparation, this
hasn't started yet for Flink 1.13.6.

Best regards,

Martijn

On Thu, 6 Jan 2022 at 15:05,  wrote:

> Hi,
>
>
>
> just to be sure: Which Flink Releases for 1.14 and 1.13 have the upgraded
> log4j version 2.17.0?
>
> Are those already deployed to docker?
>
>
>
> Many Thanks in Advance.
>
>
>
> Kind Regards,
>
>
>
> Patrick
>
> --
>
> Patrick Eifler
>
>
>
> Senior Software Engineer (BI)
>
> Cloud Gaming Engineering & Infrastructure
> Sony Interactive Entertainment LLC
>
> Wilhelmstraße 118, 10963 Berlin
>
>
> Germany
>
> E: patrick.eif...@sony.com
>
>
>
> *From: *David Morávek 
> *Date: *Wednesday, 29. December 2021 at 09:35
> *To: *narasimha 
> *Cc: *Debraj Manna , Martijn Visser <
> mart...@ververica.com>, V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com>, Chesnay Schepler , user <
> user@flink.apache.org>, Michael Guterl , Richard
> Deurwaarder , Parag Somani 
> *Subject: *Re: CVE-2021-44228 - Log4j2 vulnerability
>
> Please follow the above mentioned ML thread for more details. Please note
> that this is a REGULAR release that is not motivated by the log4j CVE, so
> the stability of the release is the more important factor then having it
> out as soon as possible.
>
>
>
> D.
>
>
>
> On Mon, Dec 27, 2021 at 6:33 AM narasimha  wrote:
>
> Hi folks,
>
>
>
> When can we expect the release to be made available to the community?
>
>
>
> On Wed, Dec 22, 2021 at 3:07 PM David Morávek  wrote:
>
> Hi Debraj,
>
>
>
> we're currently not planning another emergency release as this CVE is not
> as critical for Flink users as the previous one. However, this patch will
> be included in all upcoming patch & minor releases. The patch release for
> the 1.14.x branch is already in progress [1] (it may be bit delayed due to
> the holiday season).
>
>
>
> [1] https://lists.apache.org/thread/24v8bh3jww7c5bvfgov9cp5mb0wtj7tk
> 
>
>
>
> Best,
>
> D.
>
>
>
> On Wed, Dec 22, 2021 at 7:02 AM Debraj Manna 
> wrote:
>
> Any idea when can we expect
> https://issues.apache.org/jira/browse/FLINK-25375
> 
> to be released?
>
>
>
> On Mon, Dec 20, 2021 at 8:18 PM Martijn Visser 
> wrote:
>
> Hi,
>
>
>
> The status and Flink ticket for upgrading to Log4j 2.17.0 can be tracked
> at https://issues.apache.org/jira/browse/FLINK-25375
> 
> .
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> On Sat, 18 Dec 2021 at 16:50, V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com> wrote:
>
> Hi,
>
>
>
> It seems there is high severity vulnerability in log4j 2.16.0.(
> CVE-2021-45105
> 
> )
>
> Refer : https://logging.apache.org/log4j/2.x/security.html
> 
>
> Any update on this please?
>
>
>
> Regards,
>
> Suchithra
>
>
>
> *From:* Chesnay Schepler 
> *Sent:* Thursday, December 16, 2021 4:35 PM
> *To:* Parag Somani 
> *Cc:* Michael Guterl ; V N, Suchithra (Nokia -
> IN/Bangalore) ; Richard Deurwaarder <
> rich...@xeli.eu>; user 
> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>
>
>
> We will announce the releases when the binaries are available.
>
>
>
> On 16/12/2021 05:37, Parag Somani wrote:
>
> Thank you Chesnay for expediting this fix...!
>
>
>
> Can you suggest, when can I get binaries for 1.14.2 flink version?
>
>
>
> On Thu, Dec 16, 2021 at 5:52 AM Chesnay Schepler 
> wrote:
>
> We will push docker images for all new releases, yes.
>
>
>
> On 16/12/2021 01:16, Michael Guterl wrote:
>
> Will you all be pushing Docker images for the 1.11.6 release?
>
>
>
> On Wed, Dec 15, 2021 at 3:26 AM Chesnay Schepler 
> wrote:
>
> The current ETA is 40h for an official announcement.
>
> We are validating the release today (concludes in 16h), publish it
> tonight, then wait for mirrors to be sync (about a day), then we announce
> it.
>
>
>
> On 15/12/2021 12:08, V N, Suchithra (Nokia - IN/Bangalore) wrote:
>
> Hello,
>
>
>
> 

Re: CVE-2021-44228 - Log4j2 vulnerability

2022-01-06 Thread Patrick.Eifler
Hi,

just to be sure: Which Flink Releases for 1.14 and 1.13 have the upgraded log4j 
version 2.17.0?
Are those already deployed to docker?

Many Thanks in Advance.

Kind Regards,

Patrick
--
Patrick Eifler

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure
Sony Interactive Entertainment LLC

Wilhelmstraße 118, 10963 Berlin

Germany

E: patrick.eif...@sony.com

From: David Morávek 
Date: Wednesday, 29. December 2021 at 09:35
To: narasimha 
Cc: Debraj Manna , Martijn Visser 
, V N, Suchithra (Nokia - IN/Bangalore) 
, Chesnay Schepler , user 
, Michael Guterl , Richard 
Deurwaarder , Parag Somani 
Subject: Re: CVE-2021-44228 - Log4j2 vulnerability
Please follow the above mentioned ML thread for more details. Please note that 
this is a REGULAR release that is not motivated by the log4j CVE, so the 
stability of the release is the more important factor then having it out as 
soon as possible.

D.

On Mon, Dec 27, 2021 at 6:33 AM narasimha 
mailto:swamy.haj...@gmail.com>> wrote:
Hi folks,

When can we expect the release to be made available to the community?

On Wed, Dec 22, 2021 at 3:07 PM David Morávek 
mailto:d...@apache.org>> wrote:
Hi Debraj,

we're currently not planning another emergency release as this CVE is not as 
critical for Flink users as the previous one. However, this patch will be 
included in all upcoming patch & minor releases. The patch release for the 
1.14.x branch is already in progress [1] (it may be bit delayed due to the 
holiday season).

[1] 
https://lists.apache.org/thread/24v8bh3jww7c5bvfgov9cp5mb0wtj7tk

Best,
D.

On Wed, Dec 22, 2021 at 7:02 AM Debraj Manna 
mailto:subharaj.ma...@gmail.com>> wrote:
Any idea when can we expect 
https://issues.apache.org/jira/browse/FLINK-25375
 to be released?

On Mon, Dec 20, 2021 at 8:18 PM Martijn Visser 
mailto:mart...@ververica.com>> wrote:
Hi,

The status and Flink ticket for upgrading to Log4j 2.17.0 can be tracked at 
https://issues.apache.org/jira/browse/FLINK-25375.

Best regards,

Martijn

On Sat, 18 Dec 2021 at 16:50, V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> wrote:
Hi,

It seems there is high severity vulnerability in log4j 
2.16.0.(CVE-2021-45105)
Refer : 
https://logging.apache.org/log4j/2.x/security.html
Any update on this please?

Regards,
Suchithra

From: Chesnay Schepler mailto:ches...@apache.org>>
Sent: Thursday, December 16, 2021 4:35 PM
To: Parag Somani mailto:somanipa...@gmail.com>>
Cc: Michael Guterl mailto:gute...@justin.tv>>; V N, 
Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>>; Richard Deurwaarder 
mailto:rich...@xeli.eu>>; user 
mailto:user@flink.apache.org>>
Subject: Re: CVE-2021-44228 - Log4j2 vulnerability

We will announce the releases when the binaries are available.

On 16/12/2021 05:37, Parag Somani wrote:
Thank you Chesnay for expediting this fix...!

Can you suggest, when can I get binaries for 1.14.2 flink version?

On Thu, Dec 16, 2021 at 5:52 AM Chesnay Schepler 
mailto:ches...@apache.org>> wrote:
We will push docker images for all new releases, yes.

On 16/12/2021 01:16, Michael Guterl wrote:
Will you all be pushing Docker images for the 1.11.6 release?

On Wed, Dec 15, 2021 at 3:26 AM Chesnay Schepler 
mailto:ches...@apache.org>> wrote:
The current ETA is 40h for an official announcement.
We are validating the release today (concludes in 16h), publish it tonight, 
then wait for mirrors to be sync (about a day), then we announce it.

On 15/12/2021 12:08, V N, Suchithra (Nokia - IN/Bangalore) wrote:
Hello,

Could you please tell when we can expect Flink 1.12.7 release? We are waiting 
for the CVE fix.

Regards,
Suchithra


From: Chesnay Schepler 
Sent: Wednesday, December 15, 2021 4:04 PM
To: Richard Deurwaarder 
Cc: user 
Subject: Re: CVE-2021-44228 - Log4j2 vulnerability

We will also update the docker images.

On 15/12/2021 11:29, Richard Deurwaarder wrote:
Thanks for picking this up quickly!

I saw you've made a second minor upgrade to upgrade to log4j2 2.16 which is 
perfect.

Just to clarify: Will you 

Re: Converting parquet MessageType to flink RowType

2022-01-06 Thread Meghajit Mazumdar
Hi Jing,

Thanks for explaining this. This helps.

As you suggested, I tried specifying some of the field names with the field
types for my parquet files, and it works. I am able to read the
specific fields.

However, I have some nested fields also in my parquet schema like this
which I want to read :

  optional group location = 11 {
optional double latitude = 1;
optional double longitude = 2;
  }

How do you suppose I create a RowType for this ? I did something like this
below, but I got exception `Caused by:
java.lang.UnsupportedOperationException: Complex types not supported`

RowType nestedRowType = RowType.of(new LogicalType[] {new
DoubleType(), new DoubleType()}, new String[]{"latitude", "longitude"});
final LogicalType[] fieldTypes = new
LogicalType[]{nestedRowType};
final ParquetColumnarRowInputFormat format =
new ParquetColumnarRowInputFormat<>(
new Configuration(),
RowType.of(fieldTypes, new
String[]{"location"}),
500,
false,
true);

On Thu, Jan 6, 2022 at 6:54 PM Jing Ge  wrote:

> Hi Meghajit,
>
> good catch! Thanks for correcting me. The question is about how to use
> column-oriented storage format like Parquet. What I tried to explain was
> that the original MessageType has been used to build a projected
> MessageType, since only required columns should be read. Without the input
> from the user, there is no way to build the projected schema except read
> all columns. Even if we could convert the MessageType to RowType, we would
> still need the user's input. The fieldTypes are therefore (mandatorily)
> required with current implementation because, when the given fields could
> not be found *by the ParquetVectorizedInputFormat *in the parquet footer,
> a type info is still needed to build the projected schema.
>
> Best regards
> Jing
>
> On Thu, Jan 6, 2022 at 12:38 PM Meghajit Mazumdar <
> meghajit.mazum...@gojek.com> wrote:
>
>> Hi Jing,
>>
>> Thanks for the reply.
>> Had 2 doubts related to your answer :
>>
>> 1. There was a conversion from Flink GroupType to Parquet MessageType. It
>> might be possible to build the conversion the other way around.
>> -> Both GroupType and MessageType are parquet data structures I believe,
>> present in the org.apache.parquet.schema package. I am actually looking if
>> it is possible to convert it into a Flink data type, such as RowType.
>>
>> 2. The fieldTypes are required in case the given fields could not be
>> found in the parquet footer, like for example typo.
>> -> Does this mean that fieldTypes are not required to be given during the
>> construction of RowType ? I tried leaving it empty as below, but it gave an
>> exception *Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.table.data.vector.ColumnVector*
>>
>> final ParquetColumnarRowInputFormat format =
>> new ParquetColumnarRowInputFormat<>(
>> new Configuration(),
>> RowType.of(new LogicalType[]{}, new
>> String[]{"field_name_1", "field_name_2"}),
>> 500,
>> false,
>> true);
>>
>> Regards,
>> Meghajit
>>
>> On Thu, Jan 6, 2022 at 3:43 PM Jing Ge  wrote:
>>
>>> Hi Meghajit,
>>>
>>> thanks for asking. If you took a look at the source code
>>> https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L174,
>>> you should see Parquet MessageType has been read from the footer and used.
>>> There was a conversion from Flink GroupType to Parquet MessageType. It
>>> might be possible to build the conversion the other way around. But the
>>> question is about the performance, because only the required columns should
>>> be read, therefore the column names should be given by the user. The
>>> fieldTypes are required in case the given fields could not be found in the
>>> parquet footer, like for example typo.
>>>
>>> Best regards
>>> Jing
>>>
>>> On Thu, Jan 6, 2022 at 7:01 AM Meghajit Mazumdar <
>>> meghajit.mazum...@gojek.com> wrote:
>>>
 Hello,

 We want to read and process Parquet Files using a FileSource and the 
 DataStream API.


 Currently, as referenced from the documentation 
 ,
  this is the way in which a 

Re: Converting parquet MessageType to flink RowType

2022-01-06 Thread Jing Ge
Hi Meghajit,

good catch! Thanks for correcting me. The question is about how to use
column-oriented storage format like Parquet. What I tried to explain was
that the original MessageType has been used to build a projected
MessageType, since only required columns should be read. Without the input
from the user, there is no way to build the projected schema except read
all columns. Even if we could convert the MessageType to RowType, we would
still need the user's input. The fieldTypes are therefore (mandatorily)
required with current implementation because, when the given fields could
not be found *by the ParquetVectorizedInputFormat *in the parquet footer, a
type info is still needed to build the projected schema.

Best regards
Jing

On Thu, Jan 6, 2022 at 12:38 PM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hi Jing,
>
> Thanks for the reply.
> Had 2 doubts related to your answer :
>
> 1. There was a conversion from Flink GroupType to Parquet MessageType. It
> might be possible to build the conversion the other way around.
> -> Both GroupType and MessageType are parquet data structures I believe,
> present in the org.apache.parquet.schema package. I am actually looking if
> it is possible to convert it into a Flink data type, such as RowType.
>
> 2. The fieldTypes are required in case the given fields could not be found
> in the parquet footer, like for example typo.
> -> Does this mean that fieldTypes are not required to be given during the
> construction of RowType ? I tried leaving it empty as below, but it gave an
> exception *Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.table.data.vector.ColumnVector*
>
> final ParquetColumnarRowInputFormat format =
> new ParquetColumnarRowInputFormat<>(
> new Configuration(),
> RowType.of(new LogicalType[]{}, new
> String[]{"field_name_1", "field_name_2"}),
> 500,
> false,
> true);
>
> Regards,
> Meghajit
>
> On Thu, Jan 6, 2022 at 3:43 PM Jing Ge  wrote:
>
>> Hi Meghajit,
>>
>> thanks for asking. If you took a look at the source code
>> https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L174,
>> you should see Parquet MessageType has been read from the footer and used.
>> There was a conversion from Flink GroupType to Parquet MessageType. It
>> might be possible to build the conversion the other way around. But the
>> question is about the performance, because only the required columns should
>> be read, therefore the column names should be given by the user. The
>> fieldTypes are required in case the given fields could not be found in the
>> parquet footer, like for example typo.
>>
>> Best regards
>> Jing
>>
>> On Thu, Jan 6, 2022 at 7:01 AM Meghajit Mazumdar <
>> meghajit.mazum...@gojek.com> wrote:
>>
>>> Hello,
>>>
>>> We want to read and process Parquet Files using a FileSource and the 
>>> DataStream API.
>>>
>>>
>>> Currently, as referenced from the documentation 
>>> ,
>>>  this is the way in which a FileSource for Parquet is created. As can be 
>>> seen, it requires the construction of a RowType like this
>>>
>>>
>>> RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});*
>>>
>>>
>>> where fieldTypes is created like this:
>>>
>>>
>>> *final* LogicalType*[]* fieldTypes *=*
>>>
>>>   *new* LogicalType*[]* *{*
>>>
>>>   *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()*
>>>
>>>   *};*
>>>
>>>
>>> Ideally, instead of specifying the column names( f7, f99,...) and their 
>>> data types(DoubleType, VarCharType, ...), we would like to use the schema 
>>> of the Parquet File itself to create a RowType.
>>>
>>> The schema is present in the footer of the Parquet file, inside the 
>>> metadata.
>>>
>>> We wanted to know if there is an easy way by which way we can convert a 
>>> parquet schema, i:e, *MessageType* into a Flink *RowType* directly ?
>>>
>>> The parquet schema of the file can be easily obtained by using 
>>> *org.apache.parquet.hadoop.ParquetFileReader* as follows:
>>>
>>>
>>> ParquetFileReader reader = 
>>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>>>
>>> MessageType schema = reader.getFileMetaData().getSchema(); // this schema 
>>> has the field names as well as the data types of the parquet records
>>>
>>>
>>> As of now, 

Re: Re: flink sql回撤流sink优化问题

2022-01-06 Thread Benchao Li
这个问题可以用mini-batch[1]来解决呀

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation

casel.chen  于2021年12月26日周日 18:01写道:

> 你说的是upsert-kafka的这两个参数吗?
>
> sink.buffer-flush.max-rows
> sink.buffer-flush.interval
> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-12-25 22:54:19,"郭伟权"  写道:
>
> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
> >
> >casel.chen  于2021年12月23日周四 08:15写道:
> >
> >> flink sql中aggregate without
> >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
> >>
> >>
> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
> >>
> >> orderid.   categorydt  amt
> >>
> >> 订单id 商品类型   购买时间(MMddHH)  购买金额
> >>
> >>
> >>
> >>
> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
> >>
> >>
> >>
> >> INSERT INTO mysql_sink_table
> >>
> >> SELECT category, dt, LAST_VALUE(total)
> >>
> >> OVER (
> >>
> >>   PARTITION BY category
> >>
> >>   ORDER BY PROCTIME()
> >>
> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
> >>
> >> ) AS var1
> >>
> >> FROM (
> >>
> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
> >>
> >> );
>


-- 

Best,
Benchao Li


Re: Converting parquet MessageType to flink RowType

2022-01-06 Thread Meghajit Mazumdar
Hi Jing,

Thanks for the reply.
Had 2 doubts related to your answer :

1. There was a conversion from Flink GroupType to Parquet MessageType. It
might be possible to build the conversion the other way around.
-> Both GroupType and MessageType are parquet data structures I believe,
present in the org.apache.parquet.schema package. I am actually looking if
it is possible to convert it into a Flink data type, such as RowType.

2. The fieldTypes are required in case the given fields could not be found
in the parquet footer, like for example typo.
-> Does this mean that fieldTypes are not required to be given during the
construction of RowType ? I tried leaving it empty as below, but it gave an
exception *Caused by: java.lang.ClassNotFoundException:
org.apache.flink.table.data.vector.ColumnVector*

final ParquetColumnarRowInputFormat format =
new ParquetColumnarRowInputFormat<>(
new Configuration(),
RowType.of(new LogicalType[]{}, new
String[]{"field_name_1", "field_name_2"}),
500,
false,
true);

Regards,
Meghajit

On Thu, Jan 6, 2022 at 3:43 PM Jing Ge  wrote:

> Hi Meghajit,
>
> thanks for asking. If you took a look at the source code
> https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L174,
> you should see Parquet MessageType has been read from the footer and used.
> There was a conversion from Flink GroupType to Parquet MessageType. It
> might be possible to build the conversion the other way around. But the
> question is about the performance, because only the required columns should
> be read, therefore the column names should be given by the user. The
> fieldTypes are required in case the given fields could not be found in the
> parquet footer, like for example typo.
>
> Best regards
> Jing
>
> On Thu, Jan 6, 2022 at 7:01 AM Meghajit Mazumdar <
> meghajit.mazum...@gojek.com> wrote:
>
>> Hello,
>>
>> We want to read and process Parquet Files using a FileSource and the 
>> DataStream API.
>>
>>
>> Currently, as referenced from the documentation 
>> ,
>>  this is the way in which a FileSource for Parquet is created. As can be 
>> seen, it requires the construction of a RowType like this
>>
>>
>> RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});*
>>
>>
>> where fieldTypes is created like this:
>>
>>
>> *final* LogicalType*[]* fieldTypes *=*
>>
>>   *new* LogicalType*[]* *{*
>>
>>   *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()*
>>
>>   *};*
>>
>>
>> Ideally, instead of specifying the column names( f7, f99,...) and their data 
>> types(DoubleType, VarCharType, ...), we would like to use the schema of the 
>> Parquet File itself to create a RowType.
>>
>> The schema is present in the footer of the Parquet file, inside the metadata.
>>
>> We wanted to know if there is an easy way by which way we can convert a 
>> parquet schema, i:e, *MessageType* into a Flink *RowType* directly ?
>>
>> The parquet schema of the file can be easily obtained by using 
>> *org.apache.parquet.hadoop.ParquetFileReader* as follows:
>>
>>
>> ParquetFileReader reader = 
>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>>
>> MessageType schema = reader.getFileMetaData().getSchema(); // this schema 
>> has the field names as well as the data types of the parquet records
>>
>>
>> As of now, because we couldn’t find a way to convert the schema into a 
>> RowType directly, we resorted to writing our own custom parser to parse a 
>> Parquet SimpleGroup into a Flink Row like this:
>>
>>
>> ParquetFileReader reader = 
>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>>
>> PageReadStore nextPage = reader.readNextRowGroup();
>>
>> Row row = parseToRow(SimpleGroup g); // custom parser function
>>
>>
>> Looking forward to an answer from the community. Thanks !
>>
>>
>> Regards,
>>
>> Meghajit
>>
>>
>>


退订

2022-01-06 Thread 许友昌
退订

Re: Converting parquet MessageType to flink RowType

2022-01-06 Thread Jing Ge
Hi Meghajit,

thanks for asking. If you took a look at the source code
https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L174,
you should see Parquet MessageType has been read from the footer and used.
There was a conversion from Flink GroupType to Parquet MessageType. It
might be possible to build the conversion the other way around. But the
question is about the performance, because only the required columns should
be read, therefore the column names should be given by the user. The
fieldTypes are required in case the given fields could not be found in the
parquet footer, like for example typo.

Best regards
Jing

On Thu, Jan 6, 2022 at 7:01 AM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello,
>
> We want to read and process Parquet Files using a FileSource and the 
> DataStream API.
>
>
> Currently, as referenced from the documentation 
> ,
>  this is the way in which a FileSource for Parquet is created. As can be 
> seen, it requires the construction of a RowType like this
>
>
> RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});*
>
>
> where fieldTypes is created like this:
>
>
> *final* LogicalType*[]* fieldTypes *=*
>
>   *new* LogicalType*[]* *{*
>
>   *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()*
>
>   *};*
>
>
> Ideally, instead of specifying the column names( f7, f99,...) and their data 
> types(DoubleType, VarCharType, ...), we would like to use the schema of the 
> Parquet File itself to create a RowType.
>
> The schema is present in the footer of the Parquet file, inside the metadata.
>
> We wanted to know if there is an easy way by which way we can convert a 
> parquet schema, i:e, *MessageType* into a Flink *RowType* directly ?
>
> The parquet schema of the file can be easily obtained by using 
> *org.apache.parquet.hadoop.ParquetFileReader* as follows:
>
>
> ParquetFileReader reader = 
> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>
> MessageType schema = reader.getFileMetaData().getSchema(); // this schema has 
> the field names as well as the data types of the parquet records
>
>
> As of now, because we couldn’t find a way to convert the schema into a 
> RowType directly, we resorted to writing our own custom parser to parse a 
> Parquet SimpleGroup into a Flink Row like this:
>
>
> ParquetFileReader reader = 
> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>
> PageReadStore nextPage = reader.readNextRowGroup();
>
> Row row = parseToRow(SimpleGroup g); // custom parser function
>
>
> Looking forward to an answer from the community. Thanks !
>
>
> Regards,
>
> Meghajit
>
>
>


回复:flinkCDC2.1.1

2022-01-06 Thread Fei Han
flink cdc 数据同步作业无法提交(standalone),issues也提交了。报错如下
WARN  org.apache.flink.table.client.cli.CliClient  [] - Could 
not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Could not execute 
SQL statement.
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:228)
 ~[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.cli.CliClient.callInserts(CliClient.java:518) 
~[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:507) 
~[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:409) 
~[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
 [flink-sql-client_2.12-1.13.3.jar:1.13.3]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_211]
at 
org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
 [flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.cli.CliClient.executeFile(CliClient.java:316) 
[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.cli.CliClient.executeInNonInteractiveMode(CliClient.java:230)
 [flink-sql-client_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:153) 
[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
[flink-sql-client_2.12-1.13.3.jar:1.13.3]
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:777)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:742)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeModifyOperations$4(LocalExecutor.java:226)
 ~[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
 ~[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:226)
 ~[flink-sql-client_2.12-1.13.3.jar:1.13.3]
... 12 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'orders'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:759)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:742)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeModifyOperations$4(LocalExecutor.java:226)
 ~[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
 ~[flink-sql-client_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:226)
 ~[flink-sql-client_2.12-1.13.3.jar:1.13.3]
... 12 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
 ~[?:1.8.0_211]
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
 ~[?:1.8.0_211]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_211]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_211]
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 

回复:邮件归档访问不了

2022-01-06 Thread Fei Han
我单独私聊下,钉钉。现在有几个问题,帮忙看下拉


--
发件人:Jark Wu 
发送时间:2022年1月6日(星期四) 16:43
收件人:user-zh 
主 题:Re: 邮件归档访问不了

nabble 服务挂了,用这个地址吧:https://lists.apache.org/list.html?d...@flink.apache.org

On Fri, 31 Dec 2021 at 18:29, Ada Wong  wrote:

> 想看当时的讨论情况,但是这个访问不了。
>
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Move-Flink-ML-pipeline-API-and-library-code-to-a-separate-repository-named-flink-ml-tc49420.html
>


Re: flinkCDC2.1.1

2022-01-06 Thread Jark Wu
Flink CDC 的问题可以 report 到
https://github.com/ververica/flink-cdc-connectors/issues到

On Thu, 30 Dec 2021 at 14:08, Liu Join  wrote:

> 使用flinkCDC2.1.1读取MySQL数据,一段时间后报错
> 图床链接:报错图片 
>
>
>
> 从 Windows 版邮件 发送
>
>
>


Re: 邮件归档访问不了

2022-01-06 Thread Jark Wu
nabble 服务挂了,用这个地址吧:https://lists.apache.org/list.html?d...@flink.apache.org

On Fri, 31 Dec 2021 at 18:29, Ada Wong  wrote:

> 想看当时的讨论情况,但是这个访问不了。
>
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Move-Flink-ML-pipeline-API-and-library-code-to-a-separate-repository-named-flink-ml-tc49420.html
>


Re: flink mysql cdc同步字段不识别

2022-01-06 Thread Jark Wu
这个报错日志应该没有关系,是 rest client 的报错,不是正常数据处理流程的报错。

mysql-cdc 没有 jackson json 解析相关的代码。

On Wed, 5 Jan 2022 at 17:09, Fei Han 
wrote:

>
> @all:
> Flink mysql cdc同步数据报字段不识别,是什么原因造成的?难道是关键字不识别?报错日志如下:
>
>  httpResponseStatus=200 OK}
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
> Unrecognized field "status" (class
> org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as
> ignorable (one known property: "errors"])
>  at [Source: UNKNOWN; line: -1, column: -1] (through reference chain:
> org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:987)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1974)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1686)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1635)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:541)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1390)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4569)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2798)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3261)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:483)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> [?:1.8.0_211]
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> [?:1.8.0_211]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> [?:1.8.0_211]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_211]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_211]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]
>


Re: StaticFileSplitEnumerator - keeping track 9f already processed files

2022-01-06 Thread Fabian Paul
Hi,

I think your analysis is correct. One thing to note here is that I
guess when implementing the StaticFileSplitEnumerator we only thought
about the batch case where no checkpoints exist [1] on the other hand
it is possible as you have noted to run a bounded source in streaming
mode.

Although in the current implementation we already checkpoint the
remaining splits of the StaticFileSplitEnumerator so it should be easy
to also pass the alreadyDiscoveredPaths to the
StaticFileSplitEnumerator.

@Krzysztof Chmielewski can you create a ticket for that?

Best,
Fabian


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming

On Thu, Jan 6, 2022 at 9:13 AM Krzysztof Chmielewski
 wrote:
>
> Hi,
> Yes I know that ContinuousFileSplitEnumerator has continuously scan the 
> monitored folder for the new files and StaticFileSplitEnumerator does not, 
> this is clear.
>
> However I was asking about a different scenario, the scenario when we are 
> restoring from a checkpoint.
> FileSource can process many files, not only one. The underlying API uses 
> array of paths not just single Path.
>
> If I understand correctly, when we are recovering from a checkpoint, for 
> example due to Job Manager issue, FileEnumerator will create an Array of 
> Splits and pass it to StaticFileSplitEnumerator.
>
> Same goes for ContinuousFileSplitEnumerator. However  when 
> ContinuousFileSplitEnumerator is started, it iterates through Path[] array 
> and checks which files were already processed and skip them using 
> pathsAlreadyProcessed set hence not creating Splits for those files.
>
> However it seems that StaticFileSplitEnumerator will reprocess files that 
> were already used for Split creation. In case of Checkpoint restoration it 
> does not check if that file was already processed.
>
> Regards,
> Krzysztof Chmielewski
>
>
>
>
> czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng  napisał:
>>
>> Hi!
>>
>> Do you mean the pathsAlreadyProcessed set in ContinuousFileSplitEnumerator?
>>
>> This is because ContinuousFileSplitEnumerator has to continuously add new 
>> files to splitAssigner, while StaticFileSplitEnumerator does not. The 
>> pathsAlreadyProcessed set records the paths already discovered by 
>> ContinuousFileSplitEnumerator so that it will not add the same file to 
>> splitAssigner twice. For StaticFileSplitEnumerator it does not need to 
>> discover new files and all files have already been recorded in its 
>> splitAssigner so it does not need the pathsAlreadyProcessed set.
>>
>> For more detailed logic check the caller of the constructors of both 
>> enumerators.
>>
>> Krzysztof Chmielewski  于2022年1月6日周四 07:04写道:
>>>
>>> Hi,
>>> Why StaticFileSplitEnumerator from FileSource does not track the already 
>>> processed files similar to how ContinuousFileSplitEnumerator does?
>>>
>>> I'm thinking about scenario where we have a Bounded FileSource that reads a 
>>> lot of files using FileSource and stream it's content to Kafka.If there 
>>> will be a Job/cluster restart then we will process same files again.
>>>
>>> Regards,
>>> Krzysztof Chmielewski


Re: Passing msg and record to the process function

2022-01-06 Thread Siddhesh Kalgaonkar
Thanks, Caizhi for your explanation. It helped me to understand where I
went wrong.

On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng  wrote:

> Hi!
>
> The last expression in your try block is
>
> if(validationMessages.isEmpty) {
>   (parsedJson.toString(),
> validationMessages.foreach((msg=>msg.getMessage.toString)))
> } else {
>   (parsedJson.toString(), "Format is correct...")
> }
>
> The first one produces a (String, Unit) type while the second one produces
> a (String, String) type, so the whole if expression produces (String, Any)
> type. However your parseJson should return Either[String, String], thus
> causing this issue.
>
>
> Siddhesh Kalgaonkar  于2022年1月5日周三 19:04写道:
>
>> I have written a process function where I am parsing the JSON and if it
>> is not according to the expected format it passes as Failure to the process
>> function and I print the records which are working fine. Now, I was trying
>> to print the message and the record in case of Success and Failure. I
>> implemented the below code and it gave me the error. What exactly I am
>> missing?
>>
>> package KafkaAsSource
>>
>> import com.fasterxml.jackson.databind.ObjectMapper
>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
>> import org.apache.flink.api.scala.createTypeInformation
>> import org.apache.flink.streaming.api.functions.ProcessFunction
>> import org.apache.flink.streaming.api.scala.OutputTag
>> import org.apache.flink.util.Collector
>> import scala.jdk.CollectionConverters._
>> import scala.util.{Failure, Success, Try}
>>
>> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>>   val outputTag = new OutputTag[String]("failed")
>>
>>   def parseJson(json: String): Either[String, String] = {
>> val schemaJsonString =
>>   """
>> {
>> "$schema": "http://json-schema.org/draft-04/schema#;,
>> "title": "Product",
>> "description": "A product from the catalog",
>> "type": "object",
>> "properties": {
>> "id": {
>> "description": "The unique identifier for a product",
>> "type": "integer"
>> },
>> "premium": {
>> "description": "Annual Premium",
>> "type": "integer"
>> },
>> "eventTime": {
>> "description": "Timestamp at which record has arrived at source 
>> / generated",
>> "type": "string"
>> }
>> },
>> "required": ["id", "premium","eventTime"]
>> }
>> """
>> Try {
>>   val schema = 
>> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>>   // You can read a JSON object from String, a file, URL, etc.
>>   val parsedJson = new ObjectMapper().readTree(json)
>>   val validationMessages = schema.validate(parsedJson).asScala
>>   //validationMessages.foreach(msg => println(msg.getMessage))
>>   require(validationMessages.isEmpty)
>>   //parsedJson.toString()
>>   if(validationMessages.isEmpty)
>> {
>>   
>> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>> }
>>   else
>> {
>>   (parsedJson.toString(),"Format is correct...")
>> }
>>
>> }
>> match {
>>   case Success(x) => {
>> println("Good: " + x)
>> Right(x)
>>   }
>>   case Failure(err) => {
>> println("Bad:  " + json)
>> Left(json)
>>   }
>> }
>>   }
>>   override def processElement(i: String, context: ProcessFunction[String, 
>> String]#Context, collector: Collector[String]): Unit = {
>> parseJson(i) match {
>>   case Right(data) => {
>> collector.collect(data)
>> println("Good Records: " + data)
>>   }
>>   case Left(json) => {
>> context.output(outputTag, json)
>> println("Bad Records: " + json)
>>   }
>> }
>>   }
>> }
>>
>>
>> Error:
>>
>> type mismatch;
>>  found   : (String, Any)
>>  required: String
>> Right(x)
>>
>>


Re: StaticFileSplitEnumerator - keeping track 9f already processed files

2022-01-06 Thread Krzysztof Chmielewski
Hi,
Yes I know that ContinuousFileSplitEnumerator has continuously scan the
monitored folder for the new files and StaticFileSplitEnumerator does not,
this is clear.

However I was asking about a different scenario, the scenario when we are
restoring from a checkpoint.
FileSource can process many files, not only one. The underlying API uses
array of paths not just single Path.

If I understand correctly, when we are recovering from a checkpoint, for
example due to Job Manager issue, FileEnumerator will create an Array of
Splits and pass it to StaticFileSplitEnumerator.

Same goes for ContinuousFileSplitEnumerator. However  when
ContinuousFileSplitEnumerator is started, it iterates through Path[] array
and checks which files were already processed and skip them using
pathsAlreadyProcessed set hence not creating Splits for those files.

However it seems that StaticFileSplitEnumerator will reprocess files that
were already used for Split creation. In case of Checkpoint restoration it
does not check if that file was already processed.

Regards,
Krzysztof Chmielewski




czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng 
napisał:

> Hi!
>
> Do you mean the pathsAlreadyProcessed set in ContinuousFileSplitEnumerator?
>
> This is because ContinuousFileSplitEnumerator has to continuously add new
> files to splitAssigner, while StaticFileSplitEnumerator does not.
> The pathsAlreadyProcessed set records the paths already discovered
> by ContinuousFileSplitEnumerator so that it will not add the same file to
> splitAssigner twice. For StaticFileSplitEnumerator it does not need to
> discover new files and all files have already been recorded in its
> splitAssigner so it does not need the pathsAlreadyProcessed set.
>
> For more detailed logic check the caller of the constructors of both
> enumerators.
>
> Krzysztof Chmielewski  于2022年1月6日周四
> 07:04写道:
>
>> Hi,
>> Why StaticFileSplitEnumerator from FileSource does not track the already
>> processed files similar to how ContinuousFileSplitEnumerator does?
>>
>> I'm thinking about scenario where we have a Bounded FileSource that reads
>> a lot of files using FileSource and stream it's content to Kafka.If there
>> will be a Job/cluster restart then we will process same files again.
>>
>> Regards,
>> Krzysztof Chmielewski
>>
>


Re: extending RichSinkFunction doesn't force to implement any of its methods

2022-01-06 Thread Siddhesh Kalgaonkar
Hi Caizhi,

Thanks for your reply. Much appreciated. I understood the difference now.
Also, I have a flow like Kafka Sink Datastream -> Process Function
(Separate Class) -> Cassandra Sink(Separate Class).

Process Function returns me the output as a string and now I want to create
a DataStream out of the string variable so that I can call something like
ds.addSink(new CassandraSink()). For that, I used the StreamExecution
variable as a global /method variable but I am not able to create it
properly. Could you please refer to my StackOverflow post mentioned in the
main thread?
What is happening is, if I don't create a data stream properly it doesn't
call the sink properly because it doesn't execute the methods under the
Cassandra Sink class.

What should I do?

On Thu, Jan 6, 2022 at 7:58 AM Caizhi Weng  wrote:

> Hi!
>
> This is because ProcessFunction#processElement is a must while all methods
> in SinkFunction are not mandatory (for example you can create a sink which
> just discards all records by directly implementing SinkFunction). However
> if you want your sink to be more useful you'll have to see which methods in
> SinkFunction you need to implement. For example you can deal with the
> records fed to the sink in the invoke method or clean up the resources in
> the finish method.
>
> Siddhesh Kalgaonkar  于2022年1月6日周四 03:11写道:
>
>> I have implemented a Cassandra sink and when I am trying to call it from
>> another class via DataStream it is not calling any of the methods. I tried
>> extending other interfaces like ProcessFunction and it is forcing me to
>> implement its methods whereas. when it comes to RichSinkFunction it doesn't
>> force me to do it. Is my problem due to this? or there is something else to
>> it?
>>
>>
>> https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375
>>
>