[ https://issues.apache.org/jira/browse/BAHIR-220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yuemeng updated BAHIR-220: -------------------------- Description: currently, for Flink-1.9.0, we can use the catalog to store our stream table source and sink for Redis connector, it should exist a Redis table sink so we can register it to catalog, and use Redis as a table in SQL environment {code} Redis redis = new Redis() .mode(RedisVadidator.REDIS_CLUSTER) .command(RedisCommand.INCRBY_EX.name()) .ttl(100000) .property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" + REDIS_PORT); tableEnvironment .connect(redis).withSchema(new Schema() .field("k", TypeInformation.of(String.class)) .field("v", TypeInformation.of(Long.class))) .registerTableSink("redis"); tableEnvironment.sqlUpdate("insert into redis select k, v from t1"); env.execute("Test Redis Table"); {code} was: currently, for Flink-1.9.0, we can use the catalog to store our stream table source and sink meta. for Redis connector, it should exist a Redis table sink so we can register it to catalog, and use Redis as a table in SQL environment {code} Redis redis = new Redis() .mode(RedisVadidator.REDIS_CLUSTER) .command(RedisCommand.INCRBY_EX.name()) .ttl(100000) .property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" + REDIS_PORT); tableEnvironment .connect(redis).withSchema(new Schema() .field("k", TypeInformation.of(String.class)) .field("v", TypeInformation.of(Long.class))) .registerTableSink("redis"); tableEnvironment.sqlUpdate("insert into redis select k, v from t1"); env.execute("Test Redis Table"); {code} > Add redis descriptor to make redis connection as a table > -------------------------------------------------------- > > Key: BAHIR-220 > URL: https://issues.apache.org/jira/browse/BAHIR-220 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors > Affects Versions: Flink-1.0 > Reporter: yuemeng > Priority: Major > > currently, for Flink-1.9.0, we can use the catalog to store our stream table > source and sink > for Redis connector, it should exist a Redis table sink so we can register it > to catalog, and use Redis as a table in SQL environment > {code} > Redis redis = new Redis() > .mode(RedisVadidator.REDIS_CLUSTER) > .command(RedisCommand.INCRBY_EX.name()) > .ttl(100000) > .property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" + > REDIS_PORT); > tableEnvironment > .connect(redis).withSchema(new Schema() > .field("k", TypeInformation.of(String.class)) > .field("v", TypeInformation.of(Long.class))) > .registerTableSink("redis"); > tableEnvironment.sqlUpdate("insert into redis select k, v from t1"); > env.execute("Test Redis Table"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)