Hi all According to the official documentation (Table API / JDBC SQL Connector v.1.14.0) "the JDBC connector allows reading data from and writing data into any relational databases with a JDBC driver". At the moment we are using SQL Server in conjunction with Flink and Java, which works perfectly fine. Now we try to fetch Data from a Kafka-Topic and write it to a SQL Server sink using PyFlink. We succeeded in fetching the data from the kafka topic, but were not able to establish a connection to SQL Server.
Our code looks as follows:
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes,
EnvironmentSettings, CsvTableSink, WriteMode
from pyflink.table.descriptors import Schema, Kafka, Json
def main():
# Create streaming environment
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.use_blink_planner()\
.build()
# create table environment
tbl_env =
StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=settings)
# add kafka connector dependency
kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-sql-connector-kafka_2.11-1.13.0.jar')
# add jdbc connector dependency
jdbc_jar =
os.path.join(os.path.abspath(os.path.dirname(__file__)),'flink-connector-jdbc_2.11-1.13.2.jar')
mssql_jar =
os.path.join(os.path.abspath(os.path.dirname(__file__)),'mssql-jdbc-8.2.2.jre8.jar')
tbl_env.get_config()\
.get_configuration().set_string("parallelism.default", "1")\
.set_string("pipeline.jars",
"file:///{};file:///{}".format(kafka_jar, jdbc_jar))\
.set_string("pipeline.classpaths",
"file:///{}".format(mssql_jar))
kafka_table_sql = """
CREATE TABLE kafka (
[..] VARCHAR,
data ROW(
[..] ROW(
[..] VARCHAR,
[..] VARCHAR
))
) WITH (
'connector' = 'kafka',
'property-version' = 'universal',
'properties.bootstrap.servers' = '[..]',
'topic' = '[..]',
'scan.startup.mode' = 'earliest-offset',
'properties.security.protocol' = 'SSL',
'properties.ssl.endpoint.identification.algorithm' = '',
'properties.ssl.truststore.location' = '[..]',
'properties.ssl.truststore.password' = '[..]',
'properties.ssl.keystore.type' = 'JKS',
'properties.ssl.keystore.location' = '[..]',
'properties.ssl.keystore.password' = [..],
'properties.ssl.key.password' = [..],
'properties.group.id' = '[..]',
'format' = 'json'
)
"""
sqlserver_table_sql = """
CREATE TABLE sqltest (
[..] VARCHAR,
[..] VARCHAR
) WITH (
'connector' = 'jdbc',
'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
'url' = 'jdbc:sqlserver://db-server/database-name',
'username' = '[..]',
'password' = '[..],
'table-name' = 'dbo.tablename'
)
"""
# create source table (kafka)
tbl_env.execute_sql(kafka_table_sql)
# create sink table (sql server)
tbl_env.execute_sql(sqlserver_table_sql)
# copy data from source to sink
tbl_env.execute_sql("INSERT INTO sqltest SELECT [..], [..] FROM
kafka").wait()
if __name__ == '__main__':
main()
Which lead to an exception (java.lang.IllegalStateException: Cannot handle such
jdbc url ..):
Traceback (most recent call last):
File "c:/projects/flink/kafka_csv_jdbc.py", line 122, in <module>
main()
File "c:/projects/flink/kafka_csv_jdbc.py", line 119, in main
tbl_env.execute_sql("[..]").wait()
File
"C:\projects\flink\flink-evn\lib\site-packages\pyflink\table\table_environment.py",
line 804, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
File "C:\projects\flink\flink-evn\lib\site-packages\py4j\java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
File
"C:\projects\flink\flink-evn\lib\site-packages\pyflink\util\exceptions.py",
line 146, in deco
return f(*a, **kw)
File "C:\projects\flink\flink-evn\lib\site-packages\py4j\protocol.py", line
328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a sink for
writing table 'default_catalog.default_database.sqltest'.
Table options are:
'connector'='jdbc'
'driver'='com.microsoft.sqlserver.jdbc.SQLServerDriver'
'password'='[..]'
'table-name'='[..]'
'url'='jdbc:sqlserver:// [..]'
'username'='[..]'
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:373)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url:
jdbc:sqlserver:// [..]
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.validateConfigOptions(JdbcDynamicTableFactory.java:304)
at
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.createDynamicTableSink(JdbcDynamicTableFactory.java:172)
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
... 29 more
How can we correctly establish a connection to SQL Server using PyFlink and the
Table API?
Any suggestions are highly appreciated.
Thanks
Diese E-Mail ist ausschliesslich für den Adressaten bestimmt. Sollten Sie diese
E-Mail irrtümlich erhalten haben oder wünschen Sie künftig keine Kontakte mehr
per E-Mail, bitten wir Sie, die Bank Cler hierüber sofort zu orientieren. Die
irrtümlich erhaltene E-Mail ist mit allen Anhängen unwiderruflich zu löschen,
allfällige Ausdrucke sind zu vernichten und auf die Verwendung des Inhalts ist
zu verzichten. Der Versand unverschlüsselter E-Mail birgt erhebliche Risiken in
sich (mangelnde Vertraulichkeit, Manipulation von Inhalt/Absender, Fehlleitung,
Viren etc.). Bank Cler lehnt jede Haftung für Schäden hieraus ab. Bank Cler
akzeptiert grundsätzlich keine per E-Mail übermittelten Aufträge, Widerrufe von
Aufträgen oder sonstige Weisungen etc., ohne verpflichtet zu sein, diese
ausdrücklich zurück zu weisen. Kündigungen von Verträgen per E-Mail sind nicht
rechtswirksam.
smime.p7s
Description: S/MIME cryptographic signature
