Re: Extract/Interpret embedded byte data from a record

2021-04-15 Thread Xingbo Huang
Hi Sumeet,

Python Row-based operation will be supported in the releases-1.13. I guess
you are looking at the code of the master branch. Since you are using the
Python Table API, you can use python udf to parse your data. For the
details of python UDF, you can refer to the doc[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions

Best,
Xingbo

Sumeet Malhotra  于2021年4月15日周四 上午9:08写道:

> Additional observation: From the Flink repo, the file
> "flink-python/pyflink/table/table.py" seems to support map(), flat_map()
> and other row based operations although the 1.12 documentation doesn't
> reflect that. Is that correct? From the code, it appears that these
> operations are supported in Python.
>
> Thanks,
> Sumeet
>
> On Thu, Apr 15, 2021 at 6:31 AM Sumeet Malhotra 
> wrote:
>
>> Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly
>> Table APIs. The documentation (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations)
>> suggests that Map() function is not currently supported in Python. So, what
>> do you think would be my options here. Should I convert to a data stream to
>> perform this in Python?
>>
>> Thanks again,
>> Sumeet
>>
>>
>> On Wed, Apr 14, 2021 at 7:09 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> One thing that you can do is to read this record using Avro keeping
>>> `Result` as `bytes` and in a subsequent mapping function, you could change
>>> the record type and deserialize the result. In Data Stream API:
>>>
>>> source.map(new MapFunction>> record_with_deserialized_result> { ...} )
>>>
>>> Best,
>>> Piotrek
>>>
>>> śr., 14 kwi 2021 o 03:17 Sumeet Malhotra 
>>> napisał(a):
>>>
 Hi,

 I'm reading data from Kafka, which is Avro encoded and has the
 following general schema:

 {
   "name": "SomeName",
   "doc": "Avro schema with variable embedded encodings",
   "type": "record",
   "fields": [
 {
   "name": "Name",
   "doc": "My name",
   "type": "string"
 },
 {
   "name": "ID",
   "doc": "My ID",
   "type": "string"
 },
 {
   "name": "Result",
   "doc": "Result data, could be encoded differently",
   "type": "bytes"
 },
 {
   "name": "ResultEncoding",
   "doc": "Result encoding media type (e.g. application/avro,
 application/json)",
   "type": "string"
 },
   ]
 }

 Basically, the "Result" field is bytes whose interpretation depends
 upon the "ResultEncoding" field i.e. either avro or json. The "Result" byte
 stream has its own well defined schema also.

 My use case involves extracting/aggregating data from within the
 embedded "Result" field. What would be the best approach to perform this
 runtime decoding and extraction of fields from the embedded byte data?
 Would user defined functions help in this case?

 Thanks in advance!
 Sumeet




Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Sumeet Malhotra
Additional observation: From the Flink repo, the file
"flink-python/pyflink/table/table.py" seems to support map(), flat_map()
and other row based operations although the 1.12 documentation doesn't
reflect that. Is that correct? From the code, it appears that these
operations are supported in Python.

Thanks,
Sumeet

On Thu, Apr 15, 2021 at 6:31 AM Sumeet Malhotra 
wrote:

> Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly
> Table APIs. The documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations)
> suggests that Map() function is not currently supported in Python. So, what
> do you think would be my options here. Should I convert to a data stream to
> perform this in Python?
>
> Thanks again,
> Sumeet
>
>
> On Wed, Apr 14, 2021 at 7:09 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> One thing that you can do is to read this record using Avro keeping
>> `Result` as `bytes` and in a subsequent mapping function, you could change
>> the record type and deserialize the result. In Data Stream API:
>>
>> source.map(new MapFunction> record_with_deserialized_result> { ...} )
>>
>> Best,
>> Piotrek
>>
>> śr., 14 kwi 2021 o 03:17 Sumeet Malhotra 
>> napisał(a):
>>
>>> Hi,
>>>
>>> I'm reading data from Kafka, which is Avro encoded and has the following
>>> general schema:
>>>
>>> {
>>>   "name": "SomeName",
>>>   "doc": "Avro schema with variable embedded encodings",
>>>   "type": "record",
>>>   "fields": [
>>> {
>>>   "name": "Name",
>>>   "doc": "My name",
>>>   "type": "string"
>>> },
>>> {
>>>   "name": "ID",
>>>   "doc": "My ID",
>>>   "type": "string"
>>> },
>>> {
>>>   "name": "Result",
>>>   "doc": "Result data, could be encoded differently",
>>>   "type": "bytes"
>>> },
>>> {
>>>   "name": "ResultEncoding",
>>>   "doc": "Result encoding media type (e.g. application/avro,
>>> application/json)",
>>>   "type": "string"
>>> },
>>>   ]
>>> }
>>>
>>> Basically, the "Result" field is bytes whose interpretation depends upon
>>> the "ResultEncoding" field i.e. either avro or json. The "Result" byte
>>> stream has its own well defined schema also.
>>>
>>> My use case involves extracting/aggregating data from within the
>>> embedded "Result" field. What would be the best approach to perform this
>>> runtime decoding and extraction of fields from the embedded byte data?
>>> Would user defined functions help in this case?
>>>
>>> Thanks in advance!
>>> Sumeet
>>>
>>>


Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Sumeet Malhotra
Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly Table
APIs. The documentation (
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations)
suggests that Map() function is not currently supported in Python. So, what
do you think would be my options here. Should I convert to a data stream to
perform this in Python?

Thanks again,
Sumeet


On Wed, Apr 14, 2021 at 7:09 PM Piotr Nowojski  wrote:

> Hi,
>
> One thing that you can do is to read this record using Avro keeping
> `Result` as `bytes` and in a subsequent mapping function, you could change
> the record type and deserialize the result. In Data Stream API:
>
> source.map(new MapFunction record_with_deserialized_result> { ...} )
>
> Best,
> Piotrek
>
> śr., 14 kwi 2021 o 03:17 Sumeet Malhotra 
> napisał(a):
>
>> Hi,
>>
>> I'm reading data from Kafka, which is Avro encoded and has the following
>> general schema:
>>
>> {
>>   "name": "SomeName",
>>   "doc": "Avro schema with variable embedded encodings",
>>   "type": "record",
>>   "fields": [
>> {
>>   "name": "Name",
>>   "doc": "My name",
>>   "type": "string"
>> },
>> {
>>   "name": "ID",
>>   "doc": "My ID",
>>   "type": "string"
>> },
>> {
>>   "name": "Result",
>>   "doc": "Result data, could be encoded differently",
>>   "type": "bytes"
>> },
>> {
>>   "name": "ResultEncoding",
>>   "doc": "Result encoding media type (e.g. application/avro,
>> application/json)",
>>   "type": "string"
>> },
>>   ]
>> }
>>
>> Basically, the "Result" field is bytes whose interpretation depends upon
>> the "ResultEncoding" field i.e. either avro or json. The "Result" byte
>> stream has its own well defined schema also.
>>
>> My use case involves extracting/aggregating data from within the embedded
>> "Result" field. What would be the best approach to perform this runtime
>> decoding and extraction of fields from the embedded byte data? Would user
>> defined functions help in this case?
>>
>> Thanks in advance!
>> Sumeet
>>
>>


Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Piotr Nowojski
Hi,

One thing that you can do is to read this record using Avro keeping
`Result` as `bytes` and in a subsequent mapping function, you could change
the record type and deserialize the result. In Data Stream API:

source.map(new MapFunction { ...} )

Best,
Piotrek

śr., 14 kwi 2021 o 03:17 Sumeet Malhotra 
napisał(a):

> Hi,
>
> I'm reading data from Kafka, which is Avro encoded and has the following
> general schema:
>
> {
>   "name": "SomeName",
>   "doc": "Avro schema with variable embedded encodings",
>   "type": "record",
>   "fields": [
> {
>   "name": "Name",
>   "doc": "My name",
>   "type": "string"
> },
> {
>   "name": "ID",
>   "doc": "My ID",
>   "type": "string"
> },
> {
>   "name": "Result",
>   "doc": "Result data, could be encoded differently",
>   "type": "bytes"
> },
> {
>   "name": "ResultEncoding",
>   "doc": "Result encoding media type (e.g. application/avro,
> application/json)",
>   "type": "string"
> },
>   ]
> }
>
> Basically, the "Result" field is bytes whose interpretation depends upon
> the "ResultEncoding" field i.e. either avro or json. The "Result" byte
> stream has its own well defined schema also.
>
> My use case involves extracting/aggregating data from within the embedded
> "Result" field. What would be the best approach to perform this runtime
> decoding and extraction of fields from the embedded byte data? Would user
> defined functions help in this case?
>
> Thanks in advance!
> Sumeet
>
>


Extract/Interpret embedded byte data from a record

2021-04-13 Thread Sumeet Malhotra
Hi,

I'm reading data from Kafka, which is Avro encoded and has the following
general schema:

{
  "name": "SomeName",
  "doc": "Avro schema with variable embedded encodings",
  "type": "record",
  "fields": [
{
  "name": "Name",
  "doc": "My name",
  "type": "string"
},
{
  "name": "ID",
  "doc": "My ID",
  "type": "string"
},
{
  "name": "Result",
  "doc": "Result data, could be encoded differently",
  "type": "bytes"
},
{
  "name": "ResultEncoding",
  "doc": "Result encoding media type (e.g. application/avro,
application/json)",
  "type": "string"
},
  ]
}

Basically, the "Result" field is bytes whose interpretation depends upon
the "ResultEncoding" field i.e. either avro or json. The "Result" byte
stream has its own well defined schema also.

My use case involves extracting/aggregating data from within the embedded
"Result" field. What would be the best approach to perform this runtime
decoding and extraction of fields from the embedded byte data? Would user
defined functions help in this case?

Thanks in advance!
Sumeet