Re: no suitable table factory for file sources

2020-02-02 Thread Jingsong Li
Hi Günter,

Now File system connector only has OldCsv format [1].
 But your yaml has new csv properties like "format.allow-comments".

You can check Old csv supported properties.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#file-system-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#old-csv-format

Best,
Jingsong Lee

On Mon, Feb 3, 2020 at 1:00 PM Günter Hipler 
wrote:

> Hi,
>
> I can't read from a file source using the sql-client tool.
>
> I just set up a simple test scenario with the configuration file in [1]
>
> I'm getting the error in [2] starting the environment with
> bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
> in the standard Flink 1.9.1 download environment.
>
> Reading the documentation
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
> I expected the file system connectors as "built in"
>
> I'm getting a similar issue while following the sql-training tutorial
> from https://github.com/ververica/sql-training/wiki. There I changed the
> used sql-client-conf.yaml file of the docker container for the sql client.
>
> Thanks for any hints
>
> Günter
>
>
>
>
> [1]
>
>
> 
> # Define table sources here. See the Table API & SQL documentation for
> details.
>
> tables:
>
>- name: Guenter
>  type: source-table
>  update-mode: append
>  connector:
>type: filesystem
>path: file:///home/swissbib/temp/trash/hello.txt
>
>
>  format:
>type: csv
># required: define the schema either by using type information
>schema: "ROW(test STRING)"
>
># or use the table's schema
>derive-schema: true
>
>field-delimiter: ";" # optional: field delimiter
> character (',' by default)
>line-delimiter: "\r\n"   # optional: line delimiter ("\n" by
> default; otherwise "\r" or "\r\n" are allowed)
>quote-character: "'" # optional: quote character for
> enclosing field values ('"' by default)
>allow-comments: true # optional: ignores comment lines
> that start with "#" (disabled by default)
>#   if enabled, make sure to also ignore parse errors to allow
> empty rows
>ignore-parse-errors: true# optional: skip fields and rows
> with parse errors instead of failing;
>#   fields are set to null in case of errors
>array-element-delimiter: "|" # optional: the array element
> delimiter string for separating
>#   array and row element values (";" by default)
>escape-character: "\\"   # optional: escape character for
> escaping values (disabled by default)
>null-literal: "n/a"  # optional: null literal string that
> is interpreted as a
>#   null value (disabled by default)
>
>
>
>
> #==
> # Execution properties
>
> #==
>
> # Execution properties allow for changing the behavior of a table program.
>
> execution:
>#planner: blink
>type: streaming  # 'batch' or 'streaming' execution
>result-mode: table   # 'changelog' or 'table' presentation of
> results
>parallelism: 1   # parallelism of the program
>max-parallelism: 128 # maximum parallelism
>min-idle-state-retention: 0  # minimum idle state retention in ms
>max-idle-state-retention: 0  # maximum idle state retention in ms
>
>
> #==
> # Deployment properties
>
> #==
>
> # Deployment properties allow for describing the cluster to which table
> # programs are submitted to.
>
> deployment:
>type: standalone # only the 'standalone' deployment is
> supported
>response-timeout: 5000   # general cluster communication timeout
> in ms
>gateway-address: ""  # (optional) address from cluster to
> gateway
>gateway-port: 0  # (optional) port from cluster to gateway
>
> [2]
>
> bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
> Reading default environment from:
> file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml
> No session environment specified.
> Validating current environment...
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: The configured
> environment is invalid. Please check your environment files again.
>  at
>
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
>  at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
> Caused by: org.apache.flink.table.client.gatew

Re: Read data from Oracle using Flink SQL API

2020-02-02 Thread Jingsong Li
Yes, And I think we should add OracleDialect,SqlServerDialect,DB2Dialect
support too.

Best,
Jingsong Lee

On Sun, Feb 2, 2020 at 5:53 PM Flavio Pompermaier 
wrote:

> Ok thanks for this info! Maybe this could be added to the
> documentation..what do you think?
>
> Il Dom 2 Feb 2020, 08:37 Jingsong Li  ha scritto:
>
>> Hi Flavio,
>>
>> You can use `JDBCTableSource`, and register it from
>>  TableEnvionment.registerTableSource, you need provide
>>  a OracleDialect, maybe just implement `canHandle` and
>>  `defaultDriverName` is OK.
>>
>> Best,
>> Jingsong Lee
>>
>> On Sun, Feb 2, 2020 at 2:42 PM Jark Wu  wrote:
>>
>>> Hi Flavio,
>>>
>>> If you want to adjust the writing statement for Oracle, you can
>>> implement the JDBCDialect for Oracle, and pass to the JDBCUpsertTableSink
>>> when constructing via `JDBCOptions.Builder#setDialect`. In this way, you
>>> don't need to recompile the source code of flink-jdbc.
>>>
>>> Best,
>>> Jark
>>>
>>> On Fri, 31 Jan 2020 at 19:28, Flavio Pompermaier 
>>> wrote:
>>>
 Hi to all,
 I was looking at the Flink SQL API's and I discovered that only a few
 drivers are supported [1], i.e. Mysql, Postgres and Derby. You could have
 problems only on the writing side of the connector (TableSink) because you
 need to adjust the override statement, but for the read part you shouldn't
 have problems with dialects...am I wrong?
 And what am I supposed to do right now if I want to connect to Oracle
 using the Table API? Do I have to use the low level JDBCInputFormat? Is
 there an easy way to connect to Oracle using the Table API without the need
 to modify and recompile the source code of Flink (just adding some
 interface implementation in the application JAR)?

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbc-connector

 Best,
 Flavio

>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee


no suitable table factory for file sources

2020-02-02 Thread Günter Hipler

Hi,

I can't read from a file source using the sql-client tool.

I just set up a simple test scenario with the configuration file in [1]

I'm getting the error in [2] starting the environment with
bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
in the standard Flink 1.9.1 download environment.

Reading the documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors 
I expected the file system connectors as "built in"


I'm getting a similar issue while following the sql-training tutorial 
from https://github.com/ververica/sql-training/wiki. There I changed the 
used sql-client-conf.yaml file of the docker container for the sql client.


Thanks for any hints

Günter




[1]


# Define table sources here. See the Table API & SQL documentation for 
details.


tables:

  - name: Guenter
    type: source-table
    update-mode: append
    connector:
  type: filesystem
  path: file:///home/swissbib/temp/trash/hello.txt


    format:
  type: csv
  # required: define the schema either by using type information
  schema: "ROW(test STRING)"

  # or use the table's schema
  derive-schema: true

  field-delimiter: ";" # optional: field delimiter 
character (',' by default)
  line-delimiter: "\r\n"   # optional: line delimiter ("\n" by 
default; otherwise "\r" or "\r\n" are allowed)
  quote-character: "'" # optional: quote character for 
enclosing field values ('"' by default)
  allow-comments: true # optional: ignores comment lines 
that start with "#" (disabled by default)
  #   if enabled, make sure to also ignore parse errors to allow 
empty rows
  ignore-parse-errors: true    # optional: skip fields and rows 
with parse errors instead of failing;

  #   fields are set to null in case of errors
  array-element-delimiter: "|" # optional: the array element 
delimiter string for separating

  #   array and row element values (";" by default)
  escape-character: "\\"   # optional: escape character for 
escaping values (disabled by default)
  null-literal: "n/a"  # optional: null literal string that 
is interpreted as a

  #   null value (disabled by default)



#==
# Execution properties
#==

# Execution properties allow for changing the behavior of a table program.

execution:
  #planner: blink
  type: streaming  # 'batch' or 'streaming' execution
  result-mode: table   # 'changelog' or 'table' presentation of 
results

  parallelism: 1   # parallelism of the program
  max-parallelism: 128 # maximum parallelism
  min-idle-state-retention: 0  # minimum idle state retention in ms
  max-idle-state-retention: 0  # maximum idle state retention in ms

#==
# Deployment properties
#==

# Deployment properties allow for describing the cluster to which table
# programs are submitted to.

deployment:
  type: standalone # only the 'standalone' deployment is 
supported
  response-timeout: 5000   # general cluster communication timeout 
in ms

  gateway-address: ""  # (optional) address from cluster to gateway
  gateway-port: 0  # (optional) port from cluster to gateway

[2]

bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
Reading default environment from: 
file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml

No session environment specified.
Validating current environment...

Exception in thread "main" 
org.apache.flink.table.client.SqlClientException: The configured 
environment is invalid. Please check your environment files again.
    at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)

    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
Could not create execution context.
    at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
    at 
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
    at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)

    ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: 
Could not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in

the classpath.

Reason: No factory supports all properties.

The following properties are requested:
connector.path=file://

[ANNOUNCE] Weekly Community Update 2020/05

2020-02-02 Thread Konstantin Knauf
Dear community,

comparably quiet times on the dev@ mailing list, so I will keep it brief
and mostly share release related updates today.

Flink Development
==

* [releases] Apache Flink 1.9.2 was released on Thursday. Check out the
release blog post [1] for details. [2]

* [releases] Gary has published and started a vote on the first release
candidate for Flink 1.10. The vote has failed due to incorrect license
information in one of the newly added modules. The community also found a
handful of additional issues, some of which might need to be fixed for the
next RC. [3]

* [docker] The Apache Flink community has unanimously approved the
integration of the Docker image publication into the Apache Flink release
process, which means https://github.com/docker-flink/docker-flink will move
under the Apache Flink project and the Apache Flink Docker images will
become official Flink releases approved by the Apache Flink PMC. [4]

[1] https://flink.apache.org/news/2020/01/30/release-1.9.2.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-9-2-released-tp37102.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-10-0-release-candidate-1-tp36985.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/RESULT-VOTE-Integrate-Flink-Docker-image-publication-into-Flink-release-process-tp37096.html

Notable Bugs
==

I am not aware of any notable user-facing bug in any of the released
versions.

Events, Blog Posts, Misc
===

* My colleague *Seth Wiesman* has published an excellent blog post on *state
evolution* in Flink covering state schema evolution, the State Processor
API and a look ahead. [5]

* Upcoming Meetups
  * On February 6 *Alexander Fedulov *will talk about Stateful Stream
Processing with Apache Flink at the R-Ladies meetup in Kiew. [6]

[5]
https://flink.apache.org/news/2020/01/29/state-unlocked-interacting-with-state-in-apache-flink.html
[6] https://www.meetup.com/rladies-kyiv/events/267948988/

Cheers,

Konstantin (@snntrable)

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData Ververica 


--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Re: Reinterpreting a pre-partitioned data stream as keyed stream

2020-02-02 Thread Guowei Ma
Hi,

1. Is the key that is used by the keyBy after point 1 precisely the same as
the key used by the 4a and 4b? If yes, I think you could use
the reinterpretAsKeyedStream to avoid the shuffle.
2. You could use SingleOutputStreamOperator::setChainingStrategy to disable
the chain or use rebalance/shuffle between the two operators if you don't
care about the order.
3. If you use the  unorderedWait the order would not be preserved even if
you use keyBy after point1
4. BTW why do you not want the operator chain together?

Best,
Guowei


KristoffSC  于2020年1月30日周四 下午7:54写道:

> Hi,
> thank you for the answer.
>
> I think I understand.
>
> In my uses case I have to keep the order of events for each key, but I dont
> have to process keys in the same order that I received them. On one point
> of
> my pipeline I'm also using a SessionWindow.
>
> My Flink environment has operator chaining enabled. I woudl say, that some
> of my operators can be chained.
>
> My pipeline is (each point is an operator after Flink's operator chainign
> mechanism)
> 1. ActiveMQ connector + mapper, all with parallelism 1 (btw I'm using a
> org.apache.bahir connector for Active MQ which does not support parallelism
> bigger than 1)
> 2. Enrichment, where Im using AsyncDataStream.unorderedWait with
> parallelism
> 5.
> 3. Event split based on some criteria (not key by) that dispatches my
> stream
> into two "sub streams"
> 4. Both substreams are keyed
> 4a. SubStream "A" has a session window applied - parallelism 6.
> 4b. Substream "B" has no windowing, no aggregation, but has a business
> logic
> for witch order of events matters. - parallelism 6
> 5. Sink for both streams.
>
>
> If I understand you and documentation correctly, Redistributing will
> forward
> messages keeping the order for a key, but events between keys can be
> delivered in a different order.
> "So in this example, the ordering within each key is preserved, but the
> parallelism does introduce non-determinism regarding the order in which the
> aggregated results for different keys arrive at the sink."
>
> Then I could use a keyBy at the pipeline beginning, just after point 1.
> But to use Window in point 4a and my process function in 4b I need to have
> a
> keyedStream. I'm using a KeyedProcessFunction there. What my options with
> this?
>
>
> P.S.
> Regarding the operator chaining, I'm aware that there is an API that allows
> me to model which operators should be chained theatergoer and which not
> even
> if they have the same parallelism level.
>
> Thanks,
> Krzysztof
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


time column used by timer

2020-02-02 Thread lec ssmi
In KeyedProcessFunction, we can register a timer based on EventTime, but in
the register method, we don't need to specify the time column. So if the
elements of this KeyedStream are not the classes that originally specified
timestamp and watermark, can this timer run normally?


Re: Read data from Oracle using Flink SQL API

2020-02-02 Thread Flavio Pompermaier
Ok thanks for this info! Maybe this could be added to the
documentation..what do you think?

Il Dom 2 Feb 2020, 08:37 Jingsong Li  ha scritto:

> Hi Flavio,
>
> You can use `JDBCTableSource`, and register it from
>  TableEnvionment.registerTableSource, you need provide
>  a OracleDialect, maybe just implement `canHandle` and
>  `defaultDriverName` is OK.
>
> Best,
> Jingsong Lee
>
> On Sun, Feb 2, 2020 at 2:42 PM Jark Wu  wrote:
>
>> Hi Flavio,
>>
>> If you want to adjust the writing statement for Oracle, you can implement
>> the JDBCDialect for Oracle, and pass to the JDBCUpsertTableSink when
>> constructing via `JDBCOptions.Builder#setDialect`. In this way, you don't
>> need to recompile the source code of flink-jdbc.
>>
>> Best,
>> Jark
>>
>> On Fri, 31 Jan 2020 at 19:28, Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> I was looking at the Flink SQL API's and I discovered that only a few
>>> drivers are supported [1], i.e. Mysql, Postgres and Derby. You could have
>>> problems only on the writing side of the connector (TableSink) because you
>>> need to adjust the override statement, but for the read part you shouldn't
>>> have problems with dialects...am I wrong?
>>> And what am I supposed to do right now if I want to connect to Oracle
>>> using the Table API? Do I have to use the low level JDBCInputFormat? Is
>>> there an easy way to connect to Oracle using the Table API without the need
>>> to modify and recompile the source code of Flink (just adding some
>>> interface implementation in the application JAR)?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbc-connector
>>>
>>> Best,
>>> Flavio
>>>
>>
>
> --
> Best, Jingsong Lee
>