Re: Batch loading into postgres database

2020-12-09 Thread Dawid Wysakowicz
Your approach looks rather good to me.

In the version with querying for the JobStatus you must remember that
there are such states as e.g. INITIALIZING, which just tells you that
the job was submitted.

In 1.12 we introduced the TableResult#await method, which is a shortcut
over what you did in the last version of yours.

Best,

Dawid

On 08/12/2020 22:40, Dylan Forciea wrote:
>
> After a bit more playing around with it today, I figured out that what
> I needed to call was:
>
>  
>
> statementSet.execute().getJobClient().get().getJobExecutionResult(getClass().getClassLoader()).get()
>
>  
>
> The fact that getJobExecutionResult required a classloader is what
> threw me off. Since I’m using an application cluster, I didn’t really
> think about the fact that I would need to supply one. It might be nice
> to have a flavor of this that just uses the default system class loader.
>
>  
>
> However, I’m still interested in if there is a better way to handle
> reloading a table or tables in active use than the series of steps
> that I went through, if anybody has any suggestions!
>
>  
>
> Thanks,
>
> Dylan Forciea
>
>  
>
> *From: *Dylan Forciea 
> *Date: *Monday, December 7, 2020 at 5:33 PM
> *To: *"user@flink.apache.org" 
> *Subject: *Re: Batch loading into postgres database
>
>  
>
> As a follow up – I’m trying to follow the approach I outlined below,
> and I’m having trouble figuring out how to perform the step of doing
> the delete/insert after the job is complete.
>
>  
>
> I’ve tried adding a job listener, like so, but that doesn’t seem to
> ever get fired off:
>
>  
>
>     valstatementSet= streamTableEnv.createStatementSet()
>
>     statementSet.addInsertSql("""
>
>   INSERT INTO table1_staging
>
>     SELECT * FROM table
>
>     """)
>
>  
>
>     statementSet.addInsertSql("""
>
>   INSERT INTO table2_staging
>
>     SELECT * FROM table
>
>     """)
>
>  
>
>     statementSet.addInsertSql("""
>
>   INSERT INTO table3_staging
>
>     SELECT * FROM table3
>
>     """)
>
>  
>
>     streamEnv.registerJobListener(newJobListener() {
>
>   overridedefonJobSubmitted(jobClient: JobClient, throwable:
> Throwable): Unit= {}
>
>  
>
>   overridedefonJobExecuted(result: JobExecutionResult, throwable:
> Throwable): Unit= {
>
>     valtime= Option(result).map(_.getNetRuntime())
>
>     if(throwable == null) {
>
>   Log.info(s"Completed job successfully in $timemilliseconds")
>
>     } else{
>
>   Log.error(s"Unable to execute job successfully", throwable)
>
>     }
>
>   }
>
>     })
>
>     statementSet.execute()
>
>  
>
> I tried the above with the execute before and after the register, but
> it doesn’t seem to fire in any case.
>
>  
>
> I also tried this:
>
>  
>
>     Try(statementSet.execute().getJobClient().get().getJobStatus().join())
>
>   .map { _ =>
>
>     Log.info(s"Completed job successfully")
>
>   }
>
>   .recover {
>
>     caset => {
>
>   Log.error(s"Unable to execute job successfully", t)
>
>     }
>
>   }
>
>  
>
> And this seems to have fired WAY before the job actually finished
> flowing all the data through. I tried both join and get on the job
> status CompleteableFuture
>
>  
>
> Is there anything I’m missing as far as being able to tell when the
> job is complete? Again, this is Flink 1.11.2 that I’m running.
>
>  
>
> Thanks,
>
> Dylan Forciea
>
>  
>
> *From: *Dylan Forciea 
> *Date: *Monday, December 7, 2020 at 8:04 AM
> *To: *"user@flink.apache.org" 
> *Subject: *Batch loading into postgres database
>
>  
>
> I am setting up a Flink job that will reload a table in a postgres
> database using the Flink SQL functionality. I just wanted to make sure
> that given the current feature set I am going about this the correct
> way. I am currently using version 1.11.2, but plan on upgrading to
> 1.12 soon whenever it is finalized.
>
>  
>
> I have setup a staging table and a final table in a postgres database.
> My plan is to have a Flink application that will truncate the contents
> of the staging table before the job begins using JDBC, run the job to
> completion, and then with JDBC delete/insert into the final table from
> the staging table in a transaction after the job completes.
>
>  
>
> Is this the expected way to interact with postgres in a batch job like
> this? Or is there some functionality or method that I am missing?
>
>  
>
> Regards,
>
> Dylan Forciae
>


signature.asc
Description: OpenPGP digital signature


Re: Batch loading into postgres database

2020-12-08 Thread Dylan Forciea
After a bit more playing around with it today, I figured out that what I needed 
to call was:

statementSet.execute().getJobClient().get().getJobExecutionResult(getClass().getClassLoader()).get()

The fact that getJobExecutionResult required a classloader is what threw me 
off. Since I’m using an application cluster, I didn’t really think about the 
fact that I would need to supply one. It might be nice to have a flavor of this 
that just uses the default system class loader.

However, I’m still interested in if there is a better way to handle reloading a 
table or tables in active use than the series of steps that I went through, if 
anybody has any suggestions!

Thanks,
Dylan Forciea

From: Dylan Forciea 
Date: Monday, December 7, 2020 at 5:33 PM
To: "user@flink.apache.org" 
Subject: Re: Batch loading into postgres database

As a follow up – I’m trying to follow the approach I outlined below, and I’m 
having trouble figuring out how to perform the step of doing the delete/insert 
after the job is complete.

I’ve tried adding a job listener, like so, but that doesn’t seem to ever get 
fired off:

val statementSet = streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
  INSERT INTO table1_staging
SELECT * FROM table
""")

statementSet.addInsertSql("""
  INSERT INTO table2_staging
SELECT * FROM table
""")

statementSet.addInsertSql("""
  INSERT INTO table3_staging
SELECT * FROM table3
""")

streamEnv.registerJobListener(new JobListener() {
  override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): 
Unit = {}

  override def onJobExecuted(result: JobExecutionResult, throwable: 
Throwable): Unit = {
val time = Option(result).map(_.getNetRuntime())
if (throwable == null) {
  Log.info(s"Completed job successfully in $time milliseconds")
} else {
  Log.error(s"Unable to execute job successfully", throwable)
}
  }
})
statementSet.execute()

I tried the above with the execute before and after the register, but it 
doesn’t seem to fire in any case.

I also tried this:

Try(statementSet.execute().getJobClient().get().getJobStatus().join())
  .map { _ =>
Log.info(s"Completed job successfully")
  }
  .recover {
case t => {
  Log.error(s"Unable to execute job successfully", t)
}
  }

And this seems to have fired WAY before the job actually finished flowing all 
the data through. I tried both join and get on the job status CompleteableFuture

Is there anything I’m missing as far as being able to tell when the job is 
complete? Again, this is Flink 1.11.2 that I’m running.

Thanks,
Dylan Forciea

From: Dylan Forciea 
Date: Monday, December 7, 2020 at 8:04 AM
To: "user@flink.apache.org" 
Subject: Batch loading into postgres database

I am setting up a Flink job that will reload a table in a postgres database 
using the Flink SQL functionality. I just wanted to make sure that given the 
current feature set I am going about this the correct way. I am currently using 
version 1.11.2, but plan on upgrading to 1.12 soon whenever it is finalized.

I have setup a staging table and a final table in a postgres database. My plan 
is to have a Flink application that will truncate the contents of the staging 
table before the job begins using JDBC, run the job to completion, and then 
with JDBC delete/insert into the final table from the staging table in a 
transaction after the job completes.

Is this the expected way to interact with postgres in a batch job like this? Or 
is there some functionality or method that I am missing?

Regards,
Dylan Forciae


Re: Batch loading into postgres database

2020-12-07 Thread Dylan Forciea
As a follow up – I’m trying to follow the approach I outlined below, and I’m 
having trouble figuring out how to perform the step of doing the delete/insert 
after the job is complete.

I’ve tried adding a job listener, like so, but that doesn’t seem to ever get 
fired off:

val statementSet = streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
  INSERT INTO table1_staging
SELECT * FROM table
""")

statementSet.addInsertSql("""
  INSERT INTO table2_staging
SELECT * FROM table
""")

statementSet.addInsertSql("""
  INSERT INTO table3_staging
SELECT * FROM table3
""")

streamEnv.registerJobListener(new JobListener() {
  override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): 
Unit = {}

  override def onJobExecuted(result: JobExecutionResult, throwable: 
Throwable): Unit = {
val time = Option(result).map(_.getNetRuntime())
if (throwable == null) {
  Log.info(s"Completed job successfully in $time milliseconds")
} else {
  Log.error(s"Unable to execute job successfully", throwable)
}
  }
})
statementSet.execute()

I tried the above with the execute before and after the register, but it 
doesn’t seem to fire in any case.

I also tried this:

Try(statementSet.execute().getJobClient().get().getJobStatus().join())
  .map { _ =>
Log.info(s"Completed job successfully")
  }
  .recover {
case t => {
  Log.error(s"Unable to execute job successfully", t)
}
  }

And this seems to have fired WAY before the job actually finished flowing all 
the data through. I tried both join and get on the job status CompleteableFuture

Is there anything I’m missing as far as being able to tell when the job is 
complete? Again, this is Flink 1.11.2 that I’m running.

Thanks,
Dylan Forciea

From: Dylan Forciea 
Date: Monday, December 7, 2020 at 8:04 AM
To: "user@flink.apache.org" 
Subject: Batch loading into postgres database

I am setting up a Flink job that will reload a table in a postgres database 
using the Flink SQL functionality. I just wanted to make sure that given the 
current feature set I am going about this the correct way. I am currently using 
version 1.11.2, but plan on upgrading to 1.12 soon whenever it is finalized.

I have setup a staging table and a final table in a postgres database. My plan 
is to have a Flink application that will truncate the contents of the staging 
table before the job begins using JDBC, run the job to completion, and then 
with JDBC delete/insert into the final table from the staging table in a 
transaction after the job completes.

Is this the expected way to interact with postgres in a batch job like this? Or 
is there some functionality or method that I am missing?

Regards,
Dylan Forciae


Batch loading into postgres database

2020-12-07 Thread Dylan Forciea
I am setting up a Flink job that will reload a table in a postgres database 
using the Flink SQL functionality. I just wanted to make sure that given the 
current feature set I am going about this the correct way. I am currently using 
version 1.11.2, but plan on upgrading to 1.12 soon whenever it is finalized.

I have setup a staging table and a final table in a postgres database. My plan 
is to have a Flink application that will truncate the contents of the staging 
table before the job begins using JDBC, run the job to completion, and then 
with JDBC delete/insert into the final table from the staging table in a 
transaction after the job completes.

Is this the expected way to interact with postgres in a batch job like this? Or 
is there some functionality or method that I am missing?

Regards,
Dylan Forciae