Re: [EXTERNAL] Re:Flink SQL and createRemoteEnvironment

2023-11-30 Thread Oxlade, Dan
Hi Xuyang,

thanks for the reply.

I've listed a complete example below. I have a working flink cluster running in 
AWS and accepting connections over https.

I think I'm probably just very confused about how this is meant to work.
What I expected:
  - The executeSql statements to be executed remotely on the flink cluster.
  - The final executeSql to create a stream in the remote flink cluster, 
which:
- when parquet files that are landed in s3://mybucket/parquet (eg 
s3://mybucket/parquet/in.parquet)
- will append the parquet to the iceberg table `out`

What I see:
  - All sql executed locally
  - A stream created in the remote flink cluster
- after it appears to have executed the sql locally
- that looks like it has created a connection from `in` to `out` 
but doesn't process records

If I just create the `in` table below and stream to print() I do see rows print 
locally in the console.

List below.
Thanks
Dan

public class Example {

public static void main(String[] args) throws Exception {
org.apache.flink.configuration.Configuration configuration = new 
org.apache.flink.configuration.Configuration();
configuration.setBoolean(SSL_REST_ENABLED, true);
try (StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
"myflinkcluster.example.com",
443,
configuration
)) {
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.newInstance()
.inStreamingMode()
.build());

var catalogName = "foo";
var dbName = "baa";

Configuration hadoopConf = clusterHadoopConf();
hadoopConf.set("hive.vectorized.execution.enabled", "false");

tEnv.registerCatalog(catalogName, new 
org.apache.iceberg.flink.FlinkCatalog(
catalogName,
"default",
Namespace.empty(),
CatalogLoader.custom(catalogName,
Map.of(
"io-impl", 
"org.apache.iceberg.aws.s3.S3FileIO",
"s3.sse.type", "kms",
"catalog-impl", 
"org.apache.iceberg.aws.glue.GlueCatalog",
"glue.skip-name-validation", "true",
"type", "iceberg",
"warehouse", "s3://mybucket/warehouse",
"lock.table", "my-dynamo-table-data-lock",
"lock-impl", 
"org.apache.iceberg.aws.dynamodb.DynamoDbLockManager"
),
hadoopConf,
"org.apache.iceberg.aws.glue.GlueCatalog"
),
true,
-1)
);

tEnv.executeSql("CREATE DATABASE IF NOT EXISTS 
`%s`.`%s`;".formatted(catalogName, dbName));
tEnv.executeSql("DROP TABLE IF EXISTS `in`;");
tEnv.executeSql("""
CREATE TABLE `in`(
`a` DOUBLE,
`b` DOUBLE,
`c` DOUBLE,
`d` STRING
) WITH (
'connector' = 'filesystem',
'path' = 's3://mybucket/parquet/',
'format' = 'parquet',
'source.monitor-interval' = '5s'
);
""");

tEnv.executeSql("""
CREATE TABLE IF NOT EXISTS `%s`.`%s`.`out`(
`a` DOUBLE,
`b` DOUBLE,
`c` DOUBLE,
`d` STRING
);
""".formatted(catalogName, dbName));

tEnv.executeSql("""
INSERT INTO `%s`.`%s`.`out`
SELECT `a`, `b`, `c`, `d`
        FROM `in`;
    """.formatted(catalogName, dbName));

env.executeAsync("IceBerger " + LocalDateTime.now());

}
}


}


From: Xuyang 
Sent: 28 November 2023 03:02
To: Oxlade, Dan 
Cc: user@flink.apache.org 
Subject: [EXTERNAL] Re:Flink SQL and createRemoteEnvironment

Hi, Dan.
Can you provide more details?

> I'm seeing unexpected behavior where it appears like the sql is executed 
> locally.
Did you find a minicluster started locall

Re:Flink SQL and createRemoteEnvironment

2023-11-27 Thread Xuyang
Hi, Dan.
Can you provide more details? 


> I'm seeing unexpected behavior where it appears like the sql is executed 
> locally. 
Did you find a minicluster started locally running you program?


> In my case the remote environment is inside AWS and it doesn't appear to pick 
> up the region and credentials unless I set the environment variables locally
I think you need make sure your local machin can connect to AWS environment 
firstly.


Overall, I think `StreamExecutionEnvironment#createRemoteEnvironment ` can meet 
your requirements.




--

Best!
Xuyang




At 2023-11-28 03:49:44, "Oxlade, Dan"  wrote:

Hi,


If I use StreamExecutionEnvironment.createRemoteEnvironment and then 
var tEnv = StreamTableEnvironment.create(env) from the resulting remote 
StreamExecutionEvironment will any sql executed using tEnv.executeSql be 
executed remotely inside the flink cluster?


I'm seeing unexpected behavior where it appears like the sql is executed 
locally. In my case the remote environment is inside AWS and it doesn't appear 
to pick up the region and credentials unless I set the environment variables 
locally - I want the job to run inside the cluster and use the remote AWS 
context, including things like the auth credentials of the AWS task.


I feel like I might be fundamentally misunderstanding.


Thanks
Dan

T. Rowe Price International Ltd (registered number 3957748) is registered in 
England and Wales with its registered office at Warwick Court, 5 Paternoster 
Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and 
regulated by the Financial Conduct Authority. The company has a branch in Dubai 
International Financial Centre (regulated by the DFSA as a Representative 
Office).

T. Rowe Price (including T. Rowe Price International Ltd and its affiliates) 
and its associates do not provide legal or tax advice. Any tax-related 
discussion contained in this e-mail, including any attachments, is not intended 
or written to be used, and cannot be used, for the purpose of (i) avoiding any 
tax penalties or (ii) promoting, marketing, or recommending to any other party 
any transaction or matter addressed herein. Please consult your independent 
legal counsel and/or professional tax advisor regarding any legal or tax issues 
raised in this e-mail.

The contents of this e-mail and any attachments are intended solely for the use 
of the named addressee(s) and may contain confidential and/or privileged 
information. Any unauthorized use, copying, disclosure, or distribution of the 
contents of this e-mail is strictly prohibited by the sender and may be 
unlawful. If you are not the intended recipient, please notify the sender 
immediately and delete this e-mail.