Re: Flink Kafka connector in Python

2020-07-14 Thread Xingbo Huang
Hi Manas,
If you want to return a RowType in Python UDF, you can use Row Class which
extends from python tuple.
You can use the following statement to import Row : from pyflink.table
import Row

Best,
Xingbo

Manas Kale  于2020年7月6日周一 下午8:08写道:

> I also tried doing this by using a User Defined Function.
>
> class DataConverter(ScalarFunction):
> def eval(self, str_data):
> data = json.loads(str_data)
> return ?? # I want to return data['0001'] in field 
> 'feature1', data['0002'] in field 'feature2' etc.
>
> t_env.register_function("data_converter", udf(DataConverter(), input_types = 
> [DataTypes.STRING()],
>   result_type =
>   DataTypes.ROW([
>   DataTypes.FIELD("feature1", 
> DataTypes.STRING())
>   ])))
>
>
> t_env.from_path(INPUT_TABLE) \
> .select("data_converter(data)") \ # <--- here "data" is the field "data" 
> from the previous mail
> .insert_into(OUTPUT_TABLE)
>
>
> I used a ROW to hold multiple values but I can't figure out how I can
> return a populated ROW object from the eval() method. Where is the method
> to construct a row/field object and return it?
>
>
> Thanks!
>
>
> On Fri, Jul 3, 2020 at 12:40 PM Manas Kale  wrote:
>
>> Hi Xingbo,
>> Thanks for the reply, I didn't know that a table schema also needs to be
>> declared after the connect or but I understand now.
>> I have another question: how do I write the parsing schemas for a field
>> that itself is a valid JSON string? For example:
>> {
>> "monitorId": 865,
>> "deviceId": "94:54:93:49:96:13",
>> "data":
>> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
>> "state": 2,
>> "time": 1593687809180
>> }
>> The field "data" is a string of valid JSON with string:number objects.
>> I'm currently trying using JSON schema object and DataTypes.ROW, but am
>> getting deserialization errors.
>>
>> .with_format(
>> Json()
>> .json_schema(
>> """
>> {
>> "type": "object",
>> "properties": {
>> "monitorId": {
>> "type": "string"
>> },
>> "deviceId": {
>> "type": "string"
>> },
>> "data": {
>> "type": "object"
>> },
>> "state": {
>> "type": "integer"
>> },
>> "time": {
>> "type": "string"
>> }
>> }
>> }
>> """
>> )
>> ) \
>> .with_schema(
>> Schema()
>> .field("monitorId", DataTypes.STRING())
>> .field("deviceId", DataTypes.STRING())
>> .field("data", DataTypes.ROW())
>> )
>>
>> Regards,
>>
>> Manas
>>
>>
>> On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang  wrote:
>>
>>> Hi, Manas
>>> You need to define the schema. You can refer to the following example:
>>>  t_env.connect(
>>> Kafka()
>>> .version('0.11')
>>> .topic(INPUT_TOPIC)
>>> .property("bootstrap.servers", PROD_KAFKA)
>>> .property("zookeeper.connect", "localhost:2181")
>>> .start_from_latest()
>>> ) \
>>> .with_format(
>>> Json()
>>> .json_schema(
>>> "{"
>>> "  type: 'object',"
>>> "  properties: {"
>>> "lon: {"
>>> "  type: 'number'"
>>> "},"
>>> "rideTime: {"
>>> "  type: 'string',"
>>> "  format: 'date-time'"
>>> "}"
>>> "  }"
>>> "}"
>>> )
>>> ) \
>>> .with_schema(  # declare the schema of the table
>>> Schema()
>>> .field("lon", DataTypes.DECIMAL(20, 10))
>>> .field("rideTime", DataTypes.TIMESTAMP(6))
>>> ).register_table_source(INPUT_TABLE)
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale  于2020年7月2日周四 下午7:59写道:
>>>
 Hi,
 I'm trying to get a simple consumer/producer running using the
 following code referred from the provided links :

 from pyflink.dataset import ExecutionEnvironment
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, 
 StreamTableEnvironment
 from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema

 exec_env = StreamExecutionEnvironment.get_execution_environment()

 t_config = TableConfig()
 t_env = StreamTableEnvironment.create(exec_env, t_config)

 INPUT_TOPIC = 'xyz'
 INPUT_TABLE = 'raw_message'
 PROD_ZOOKEEPER = '...'
 PROD_KAFKA = '...'

 OUTPUT_TOPIC = 'summary_output'
 OUTPUT_TABLE = 'feature_summary'
 LOCAL_ZOOKEEPER = 'localhost:2181'
 LOCAL_KAFKA = 'localhost:9092'


 t_env.connect(
 Kafka()
 .version('universal')
 .topic(INPUT_TOPIC)
 .property("bootstrap.servers", PROD_KAFKA)

 .start_from_latest()
 ) \
 

Re: Flink Kafka connector in Python

2020-07-06 Thread Manas Kale
I also tried doing this by using a User Defined Function.

class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ?? # I want to return data['0001'] in field
'feature1', data['0002'] in field 'feature2' etc.

t_env.register_function("data_converter", udf(DataConverter(),
input_types = [DataTypes.STRING()],
  result_type =
  DataTypes.ROW([

DataTypes.FIELD("feature1", DataTypes.STRING())
  ])))


t_env.from_path(INPUT_TABLE) \
.select("data_converter(data)") \ # <--- here "data" is the field
"data" from the previous mail
.insert_into(OUTPUT_TABLE)


I used a ROW to hold multiple values but I can't figure out how I can
return a populated ROW object from the eval() method. Where is the method
to construct a row/field object and return it?


Thanks!


On Fri, Jul 3, 2020 at 12:40 PM Manas Kale  wrote:

> Hi Xingbo,
> Thanks for the reply, I didn't know that a table schema also needs to be
> declared after the connect or but I understand now.
> I have another question: how do I write the parsing schemas for a field
> that itself is a valid JSON string? For example:
> {
> "monitorId": 865,
> "deviceId": "94:54:93:49:96:13",
> "data":
> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
> "state": 2,
> "time": 1593687809180
> }
> The field "data" is a string of valid JSON with string:number objects. I'm
> currently trying using JSON schema object and DataTypes.ROW, but am getting
> deserialization errors.
>
> .with_format(
> Json()
> .json_schema(
> """
> {
> "type": "object",
> "properties": {
> "monitorId": {
> "type": "string"
> },
> "deviceId": {
> "type": "string"
> },
> "data": {
> "type": "object"
> },
> "state": {
> "type": "integer"
> },
> "time": {
> "type": "string"
> }
> }
> }
> """
> )
> ) \
> .with_schema(
> Schema()
> .field("monitorId", DataTypes.STRING())
> .field("deviceId", DataTypes.STRING())
> .field("data", DataTypes.ROW())
> )
>
> Regards,
>
> Manas
>
>
> On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang  wrote:
>
>> Hi, Manas
>> You need to define the schema. You can refer to the following example:
>>  t_env.connect(
>> Kafka()
>> .version('0.11')
>> .topic(INPUT_TOPIC)
>> .property("bootstrap.servers", PROD_KAFKA)
>> .property("zookeeper.connect", "localhost:2181")
>> .start_from_latest()
>> ) \
>> .with_format(
>> Json()
>> .json_schema(
>> "{"
>> "  type: 'object',"
>> "  properties: {"
>> "lon: {"
>> "  type: 'number'"
>> "},"
>> "rideTime: {"
>> "  type: 'string',"
>> "  format: 'date-time'"
>> "}"
>> "  }"
>> "}"
>> )
>> ) \
>> .with_schema(  # declare the schema of the table
>> Schema()
>> .field("lon", DataTypes.DECIMAL(20, 10))
>> .field("rideTime", DataTypes.TIMESTAMP(6))
>> ).register_table_source(INPUT_TABLE)
>>
>> Best,
>> Xingbo
>>
>> Manas Kale  于2020年7月2日周四 下午7:59写道:
>>
>>> Hi,
>>> I'm trying to get a simple consumer/producer running using the following
>>> code referred from the provided links :
>>>
>>> from pyflink.dataset import ExecutionEnvironment
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, 
>>> StreamTableEnvironment
>>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
>>>
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>
>>> t_config = TableConfig()
>>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>>
>>> INPUT_TOPIC = 'xyz'
>>> INPUT_TABLE = 'raw_message'
>>> PROD_ZOOKEEPER = '...'
>>> PROD_KAFKA = '...'
>>>
>>> OUTPUT_TOPIC = 'summary_output'
>>> OUTPUT_TABLE = 'feature_summary'
>>> LOCAL_ZOOKEEPER = 'localhost:2181'
>>> LOCAL_KAFKA = 'localhost:9092'
>>>
>>>
>>> t_env.connect(
>>> Kafka()
>>> .version('universal')
>>> .topic(INPUT_TOPIC)
>>> .property("bootstrap.servers", PROD_KAFKA)
>>>
>>> .start_from_latest()
>>> ) \
>>> .with_format(
>>> Json()
>>> .json_schema(
>>> "{"
>>> "  type: 'object',"
>>> "  properties: {"
>>> "lon: {"
>>> "  type: 'number'"
>>> "},"
>>> "rideTime: {"
>>> "  type: 'string',"
>>> "  format: 'date-time'"
>>> "}"
>>> "  }"
>>> "}"
>>> )
>>> ).register_table_source(INPUT_TABLE)
>>>
>>> t_env.connect(Kafka()
>>> .version('universal')
>>> 

Re: Flink Kafka connector in Python

2020-07-03 Thread Manas Kale
Hi Xingbo,
Thanks for the reply, I didn't know that a table schema also needs to be
declared after the connect or but I understand now.
I have another question: how do I write the parsing schemas for a field
that itself is a valid JSON string? For example:
{
"monitorId": 865,
"deviceId": "94:54:93:49:96:13",
"data":
"{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
"state": 2,
"time": 1593687809180
}
The field "data" is a string of valid JSON with string:number objects. I'm
currently trying using JSON schema object and DataTypes.ROW, but am getting
deserialization errors.

.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"deviceId": {
"type": "string"
},
"data": {
"type": "object"
},
"state": {
"type": "integer"
},
"time": {
"type": "string"
}
}
}
"""
)
) \
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("deviceId", DataTypes.STRING())
.field("data", DataTypes.ROW())
)

Regards,

Manas


On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang  wrote:

> Hi, Manas
> You need to define the schema. You can refer to the following example:
>  t_env.connect(
> Kafka()
> .version('0.11')
> .topic(INPUT_TOPIC)
> .property("bootstrap.servers", PROD_KAFKA)
> .property("zookeeper.connect", "localhost:2181")
> .start_from_latest()
> ) \
> .with_format(
> Json()
> .json_schema(
> "{"
> "  type: 'object',"
> "  properties: {"
> "lon: {"
> "  type: 'number'"
> "},"
> "rideTime: {"
> "  type: 'string',"
> "  format: 'date-time'"
> "}"
> "  }"
> "}"
> )
> ) \
> .with_schema(  # declare the schema of the table
> Schema()
> .field("lon", DataTypes.DECIMAL(20, 10))
> .field("rideTime", DataTypes.TIMESTAMP(6))
> ).register_table_source(INPUT_TABLE)
>
> Best,
> Xingbo
>
> Manas Kale  于2020年7月2日周四 下午7:59写道:
>
>> Hi,
>> I'm trying to get a simple consumer/producer running using the following
>> code referred from the provided links :
>>
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, 
>> StreamTableEnvironment
>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>
>> t_config = TableConfig()
>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>
>> INPUT_TOPIC = 'xyz'
>> INPUT_TABLE = 'raw_message'
>> PROD_ZOOKEEPER = '...'
>> PROD_KAFKA = '...'
>>
>> OUTPUT_TOPIC = 'summary_output'
>> OUTPUT_TABLE = 'feature_summary'
>> LOCAL_ZOOKEEPER = 'localhost:2181'
>> LOCAL_KAFKA = 'localhost:9092'
>>
>>
>> t_env.connect(
>> Kafka()
>> .version('universal')
>> .topic(INPUT_TOPIC)
>> .property("bootstrap.servers", PROD_KAFKA)
>>
>> .start_from_latest()
>> ) \
>> .with_format(
>> Json()
>> .json_schema(
>> "{"
>> "  type: 'object',"
>> "  properties: {"
>> "lon: {"
>> "  type: 'number'"
>> "},"
>> "rideTime: {"
>> "  type: 'string',"
>> "  format: 'date-time'"
>> "}"
>> "  }"
>> "}"
>> )
>> ).register_table_source(INPUT_TABLE)
>>
>> t_env.connect(Kafka()
>> .version('universal')
>> .topic(OUTPUT_TOPIC)
>> .property("bootstrap.servers", LOCAL_KAFKA)
>>
>> .start_from_latest()
>> ) \
>> .with_format(
>> Json()
>> .json_schema(
>>"{"
>> "  type: 'object',"
>> "  properties: {"
>> "lon: {"
>> "  type: 'number'"
>> "},"
>> "rideTime: {"
>> "  type: 'string',"
>> "  format: 'date-time'"
>> "}"
>> "  }"
>> "}"
>> )).register_table_sink(OUTPUT_TABLE)
>>
>> t_env.from_path(INPUT_TABLE) \
>> .insert_into(OUTPUT_TABLE)
>>
>> t_env.execute('IU pyflink job')
>>
>> *However, I am getting the following exception : *
>>
>> Traceback (most recent call last):
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>  line 147, in deco
>> return f(*a, **kw)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>>  line 328, in get_return_value
>> format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling 
>> o32.registerTableSource.
>> : org.apache.flink.table.api.TableException: findAndCreateTableSource 

Re: Flink Kafka connector in Python

2020-07-02 Thread Xingbo Huang
Hi, Manas
You need to define the schema. You can refer to the following example:
 t_env.connect(
Kafka()
.version('0.11')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)
.property("zookeeper.connect", "localhost:2181")
.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
"  type: 'object',"
"  properties: {"
"lon: {"
"  type: 'number'"
"},"
"rideTime: {"
"  type: 'string',"
"  format: 'date-time'"
"}"
"  }"
"}"
)
) \
.with_schema(  # declare the schema of the table
Schema()
.field("lon", DataTypes.DECIMAL(20, 10))
.field("rideTime", DataTypes.TIMESTAMP(6))
).register_table_source(INPUT_TABLE)

Best,
Xingbo

Manas Kale  于2020年7月2日周四 下午7:59写道:

> Hi,
> I'm trying to get a simple consumer/producer running using the following
> code referred from the provided links :
>
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, 
> StreamTableEnvironment
> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
>
> t_config = TableConfig()
> t_env = StreamTableEnvironment.create(exec_env, t_config)
>
> INPUT_TOPIC = 'xyz'
> INPUT_TABLE = 'raw_message'
> PROD_ZOOKEEPER = '...'
> PROD_KAFKA = '...'
>
> OUTPUT_TOPIC = 'summary_output'
> OUTPUT_TABLE = 'feature_summary'
> LOCAL_ZOOKEEPER = 'localhost:2181'
> LOCAL_KAFKA = 'localhost:9092'
>
>
> t_env.connect(
> Kafka()
> .version('universal')
> .topic(INPUT_TOPIC)
> .property("bootstrap.servers", PROD_KAFKA)
>
> .start_from_latest()
> ) \
> .with_format(
> Json()
> .json_schema(
> "{"
> "  type: 'object',"
> "  properties: {"
> "lon: {"
> "  type: 'number'"
> "},"
> "rideTime: {"
> "  type: 'string',"
> "  format: 'date-time'"
> "}"
> "  }"
> "}"
> )
> ).register_table_source(INPUT_TABLE)
>
> t_env.connect(Kafka()
> .version('universal')
> .topic(OUTPUT_TOPIC)
> .property("bootstrap.servers", LOCAL_KAFKA)
>
> .start_from_latest()
> ) \
> .with_format(
> Json()
> .json_schema(
>"{"
> "  type: 'object',"
> "  properties: {"
> "lon: {"
> "  type: 'number'"
> "},"
> "rideTime: {"
> "  type: 'string',"
> "  format: 'date-time'"
> "}"
> "  }"
> "}"
> )).register_table_sink(OUTPUT_TABLE)
>
> t_env.from_path(INPUT_TABLE) \
> .insert_into(OUTPUT_TABLE)
>
> t_env.execute('IU pyflink job')
>
> *However, I am getting the following exception : *
>
> Traceback (most recent call last):
>   File 
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>  line 147, in deco
> return f(*a, **kw)
>   File 
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>  line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o32.registerTableSource.
> : org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
>   at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>   at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
>   at 
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.ValidationException: Could not find the 
> required schema in property 

Re: Flink Kafka connector in Python

2020-07-02 Thread Manas Kale
Hi,
I'm trying to get a simple consumer/producer running using the following
code referred from the provided links :

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment,
DataTypes, StreamTableEnvironment
from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema

exec_env = StreamExecutionEnvironment.get_execution_environment()

t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TOPIC = 'xyz'
INPUT_TABLE = 'raw_message'
PROD_ZOOKEEPER = '...'
PROD_KAFKA = '...'

OUTPUT_TOPIC = 'summary_output'
OUTPUT_TABLE = 'feature_summary'
LOCAL_ZOOKEEPER = 'localhost:2181'
LOCAL_KAFKA = 'localhost:9092'


t_env.connect(
Kafka()
.version('universal')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
"  type: 'object',"
"  properties: {"
"lon: {"
"  type: 'number'"
"},"
"rideTime: {"
"  type: 'string',"
"  format: 'date-time'"
"}"
"  }"
"}"
)
).register_table_source(INPUT_TABLE)

t_env.connect(Kafka()
.version('universal')
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
   "{"
"  type: 'object',"
"  properties: {"
"lon: {"
"  type: 'number'"
"},"
"rideTime: {"
"  type: 'string',"
"  format: 'date-time'"
"}"
"  }"
"}"
)).register_table_sink(OUTPUT_TABLE)

t_env.from_path(INPUT_TABLE) \
.insert_into(OUTPUT_TABLE)

t_env.execute('IU pyflink job')

*However, I am getting the following exception : *

Traceback (most recent call last):
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
return f(*a, **kw)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling
o32.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Could not
find the required schema in property 'schema'.
at 
org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
at 
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
... 13 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46,
in 
).register_table_source(INPUT_TABLE)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
line 1295, in register_table_source
self._j_connect_table_descriptor.registerTableSource(name)
  File 

Re: Flink Kafka connector in Python

2020-06-30 Thread Manas Kale
Hi Xingbo,
Thank you for the information, it certainly helps!

Regards,
Manas

On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang  wrote:

> Hi Manas,
>
> Since Flink 1.9, the entire architecture of PyFlink has been redesigned.
> So the method described in the link won't work.
> But you can use more convenient DDL[1] or descriptor[2] to read kafka
> data. Besides, You can refer to the common questions about PyFlink[3]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>
> Best,
> Xingbo
>
> Manas Kale  于2020年6月29日周一 下午8:10写道:
>
>> Hi,
>> I want to consume and write to Kafak from Flink's python API.
>>
>> The only way I found to do this was through this
>> 
>>  question
>> on SO where the user essentially copies FlinkKafka connector JARs into the
>> Flink runtime's lib/ directory.
>>
>>- Is this the recommended method to do this? If not, what is?
>>- Is there any official documentation for using Kafka with pyFlink?
>>Is this officially supported?
>>- How does the method described in the link work? Does the Flink
>>runtime load and expose all JARs in /lib to the python script? Can I write
>>custom operators in Java and use those through python?
>>
>> Thanks,
>> Manas
>>
>


Re: Flink Kafka connector in Python

2020-06-29 Thread Xingbo Huang
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So
the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data.
Besides, You can refer to the common questions about PyFlink[3]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale  于2020年6月29日周一 下午8:10写道:

> Hi,
> I want to consume and write to Kafak from Flink's python API.
>
> The only way I found to do this was through this
> 
>  question
> on SO where the user essentially copies FlinkKafka connector JARs into the
> Flink runtime's lib/ directory.
>
>- Is this the recommended method to do this? If not, what is?
>- Is there any official documentation for using Kafka with pyFlink? Is
>this officially supported?
>- How does the method described in the link work? Does the Flink
>runtime load and expose all JARs in /lib to the python script? Can I write
>custom operators in Java and use those through python?
>
> Thanks,
> Manas
>