Re: Unbounded input join Unbounded input then write to Bounded Sink

2020-02-24 Thread rahul patwari
Hi Kenn, Rui,

The pipeline that we are trying is exactly what Kenn has mentioned above
i.e.
Read From Kafka => Apply Fixed Windows of 1 Min => SqlTransform => Write to
Hive using HcatalogIO

We are interested in understanding the behaviour when the source is
Unbounded and Sink is bounded as this pipeline is being used for ETL.
Does the same pipeline work for any other Bounded Sink, instead of
HcatalogIO?
What are the features required to be supported by the Bounded Sink, for it
to be used along with an Unbounded Source?

Are there any best practices (or) pipeline patterns for these kinds of
pipelines? Will there be any performance hits?

Regards,
Rahul

On Tue, Feb 25, 2020 at 6:57 AM Rui Wang  wrote:

> Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" from
> the query above.
>
>
>
> -Rui
>
> On Mon, Feb 24, 2020 at 5:26 PM Rui Wang  wrote:
>
>> I see. So I guess I wasn't fully understand the requirement:
>>
>> Do you want to have a 1-min window join on two unbounded sources and
>> write to sink when the window closes ? Or there is an extra requirement
>> such that you also want to write to sink every minute per window?
>>
>> For the former, you can do it by SQL:
>>
>> pipeline.apply(KafkaIO.read() ... )
>> .apply(Window.into(FixedWindows.of(1 minute))
>> .apply(SqlTransform(
>>   "SELECT ... FROM
>> (select TUMBLE_START() as window_start, * FROM
>> stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
>>   JOIN
>> (select TUMBLE_START() as window_start, * FROM
>> stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
>>on table_a.window_start = table_b.window_start ...")
>> .apply(HCatalogIO.write() ...)
>>
>> But as Kenneth mentioned HCatalogIO might not work as expected.
>>
>>
>>
>> For the latter, the mixed Java and SQL pipeline won't help you.
>>
>>
>>
>> -Rui
>>
>> On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles  wrote:
>>
>>> I think actually it depends on the pipeline. You cannot do it all in
>>> SQL, but if you mix Java and SQL I think you can do this. If you write this:
>>>
>>> pipeline.apply(KafkaIO.read() ... )
>>> .apply(Window.into(FixedWindows.of(1 minute))
>>> .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>>> .apply(HCatalogIO.write() ...)
>>>
>>> This should apply the SQL on each window. When the SQL does not do any
>>> windowing, it is required to be a "per-window" SQL execution. That is the
>>> spec for SqlTransform. If that does not work, please report your experience.
>>>
>>> But the SQL semantics do not require waiting. Today the stream-to-stream
>>> join will do a CoGroupByKey so it will wait. But SQL may in the future
>>> adopt a better join for this case that can output records with lower
>>> latency.
>>>
>>> It may be a bigger question whether HCatalogIO.write() has all the knobs
>>> you would like.
>>>
>>> Kenn
>>>
>>> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang  wrote:
>>>
 SQL does not support such joins with your requirement: write to sink
 after every 1 min after window closes.

 You might can use state and timer API to achieve your goal.



 -Rui

 On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <
 shantachakp...@gmail.com> wrote:

> Hi,
>
> I am trying to join inputs from Unbounded Sources then write to
> Bounded Sink.
> The pipeline I'm trying is:
> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>  And, a FixedWindow of 1 minute duration is applied.
>
> I'm expecting the inputs from unbounded sources joined within the
> current window to be written to the HCatalogIO Sink after every 1 min i.e
> after each window interval.
>
> Can someone please tell if this is a valid scenario and what is the
> expected behaviour from this kind of scenario?
>
> Regards,
> Shanta
>



Re: Help needed on Dataflow worker exception of WriteToBigQuery

2020-02-24 Thread Reza Rokni
Could you try with --tempLocation set to a GCS bucket that your pipeline
has access to in your pipeline options?

Cheers
Reza

On Tue, Feb 25, 2020 at 9:23 AM Wenbing Bai 
wrote:

> Hi there,
>
> I am using WriteToBigQuery in apache-beam Python SDK 2.16. I get this
> error when I run my pipeline in Dataflow Runner.
>
> RuntimeError: IOError: [Errno 2] Not found:
> gs://tmp-e3271c8deb2f655-0-of-1.avro [while running
> 'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)']
>
> Anyone who had this before? Can I get any hints on where Dataflow worker
> writing data to avro?
>
> --
>
>
>
>
>
> Wenbing Bai
>
> Senior Software Engineer, MLP
>
> Cruise
>
> Pronouns: She/Her
>
>
>
> *Confidentiality Note:* We care about protecting our proprietary
> information, confidential material, and trade secrets. This message may
> contain some or all of those things. Cruise will suffer material harm if
> anyone other than the intended recipient disseminates or takes any action
> based on this message. If you have received this message (including any
> attachments) in error, please delete it immediately and notify the sender
> promptly.


Re: Unbounded input join Unbounded input then write to Bounded Sink

2020-02-24 Thread Rui Wang
I see. So I guess I wasn't fully understand the requirement:

Do you want to have a 1-min window join on two unbounded sources and write
to sink when the window closes ? Or there is an extra requirement such that
you also want to write to sink every minute per window?

For the former, you can do it by SQL:

pipeline.apply(KafkaIO.read() ... )
.apply(Window.into(FixedWindows.of(1 minute))
.apply(SqlTransform(
  "SELECT ... FROM
(select TUMBLE_START() as window_start, * FROM
stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
  JOIN
(select TUMBLE_START() as window_start, * FROM
stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
   on table_a.window_start = table_b.window_start ...")
.apply(HCatalogIO.write() ...)

But as Kenneth mentioned HCatalogIO might not work as expected.



For the latter, the mixed Java and SQL pipeline won't help you.



-Rui

On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles  wrote:

> I think actually it depends on the pipeline. You cannot do it all in SQL,
> but if you mix Java and SQL I think you can do this. If you write this:
>
> pipeline.apply(KafkaIO.read() ... )
> .apply(Window.into(FixedWindows.of(1 minute))
> .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
> .apply(HCatalogIO.write() ...)
>
> This should apply the SQL on each window. When the SQL does not do any
> windowing, it is required to be a "per-window" SQL execution. That is the
> spec for SqlTransform. If that does not work, please report your experience.
>
> But the SQL semantics do not require waiting. Today the stream-to-stream
> join will do a CoGroupByKey so it will wait. But SQL may in the future
> adopt a better join for this case that can output records with lower
> latency.
>
> It may be a bigger question whether HCatalogIO.write() has all the knobs
> you would like.
>
> Kenn
>
> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang  wrote:
>
>> SQL does not support such joins with your requirement: write to sink
>> after every 1 min after window closes.
>>
>> You might can use state and timer API to achieve your goal.
>>
>>
>>
>> -Rui
>>
>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram 
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to join inputs from Unbounded Sources then write to Bounded
>>> Sink.
>>> The pipeline I'm trying is:
>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>  And, a FixedWindow of 1 minute duration is applied.
>>>
>>> I'm expecting the inputs from unbounded sources joined within the
>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>> after each window interval.
>>>
>>> Can someone please tell if this is a valid scenario and what is the
>>> expected behaviour from this kind of scenario?
>>>
>>> Regards,
>>> Shanta
>>>
>>


Re: Unbounded input join Unbounded input then write to Bounded Sink

2020-02-24 Thread Rui Wang
Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" from
the query above.



-Rui

On Mon, Feb 24, 2020 at 5:26 PM Rui Wang  wrote:

> I see. So I guess I wasn't fully understand the requirement:
>
> Do you want to have a 1-min window join on two unbounded sources and write
> to sink when the window closes ? Or there is an extra requirement such that
> you also want to write to sink every minute per window?
>
> For the former, you can do it by SQL:
>
> pipeline.apply(KafkaIO.read() ... )
> .apply(Window.into(FixedWindows.of(1 minute))
> .apply(SqlTransform(
>   "SELECT ... FROM
> (select TUMBLE_START() as window_start, * FROM
> stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
>   JOIN
> (select TUMBLE_START() as window_start, * FROM
> stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
>on table_a.window_start = table_b.window_start ...")
> .apply(HCatalogIO.write() ...)
>
> But as Kenneth mentioned HCatalogIO might not work as expected.
>
>
>
> For the latter, the mixed Java and SQL pipeline won't help you.
>
>
>
> -Rui
>
> On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles  wrote:
>
>> I think actually it depends on the pipeline. You cannot do it all in SQL,
>> but if you mix Java and SQL I think you can do this. If you write this:
>>
>> pipeline.apply(KafkaIO.read() ... )
>> .apply(Window.into(FixedWindows.of(1 minute))
>> .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>> .apply(HCatalogIO.write() ...)
>>
>> This should apply the SQL on each window. When the SQL does not do any
>> windowing, it is required to be a "per-window" SQL execution. That is the
>> spec for SqlTransform. If that does not work, please report your experience.
>>
>> But the SQL semantics do not require waiting. Today the stream-to-stream
>> join will do a CoGroupByKey so it will wait. But SQL may in the future
>> adopt a better join for this case that can output records with lower
>> latency.
>>
>> It may be a bigger question whether HCatalogIO.write() has all the knobs
>> you would like.
>>
>> Kenn
>>
>> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang  wrote:
>>
>>> SQL does not support such joins with your requirement: write to sink
>>> after every 1 min after window closes.
>>>
>>> You might can use state and timer API to achieve your goal.
>>>
>>>
>>>
>>> -Rui
>>>
>>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <
>>> shantachakp...@gmail.com> wrote:
>>>
 Hi,

 I am trying to join inputs from Unbounded Sources then write to Bounded
 Sink.
 The pipeline I'm trying is:
 Kafka Sources -> SqlTransform -> HCatalogIO  Sink
  And, a FixedWindow of 1 minute duration is applied.

 I'm expecting the inputs from unbounded sources joined within the
 current window to be written to the HCatalogIO Sink after every 1 min i.e
 after each window interval.

 Can someone please tell if this is a valid scenario and what is the
 expected behaviour from this kind of scenario?

 Regards,
 Shanta

>>>


Help needed on Dataflow worker exception of WriteToBigQuery

2020-02-24 Thread Wenbing Bai
Hi there,

I am using WriteToBigQuery in apache-beam Python SDK 2.16. I get this error
when I run my pipeline in Dataflow Runner.

RuntimeError: IOError: [Errno 2] Not found:
gs://tmp-e3271c8deb2f655-0-of-1.avro [while running
'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)']

Anyone who had this before? Can I get any hints on where Dataflow worker
writing data to avro?

-- 





Wenbing Bai

Senior Software Engineer, MLP

Cruise

Pronouns: She/Her

-- 


*Confidentiality Note:* We care about protecting our proprietary 
information, confidential material, and trade secrets. This message may 
contain some or all of those things. Cruise will suffer material harm if 
anyone other than the intended recipient disseminates or takes any action 
based on this message. If you have received this message (including any 
attachments) in error, please delete it immediately and notify the sender 
promptly.


Re: Beam SQL Nested Rows are flatten by Calcite

2020-02-24 Thread Rui Wang
That is great. Feel free to send the patch and I can review it.

-Rui

On Mon, Feb 24, 2020, 3:54 PM Talat Uyarer 
wrote:

> Hi Rui,
>
> I solved the issue. After 1.21 version they are not getting flattened in
> LogicalPlan.
>
> Thanks for your help. I am going to create a patch for it.
>
> Talat
>
> On Sat, Feb 15, 2020 at 6:26 PM Rui Wang  wrote:
>
>> Because Calcite flattened Row so BeamSQL didn't need to deal with nested
>> Row structure (as they were flattened in LogicalPlan).
>>
>> Depends on how that patch works. Nested row might not immediately work
>> after you apply that patch.
>>
>>
>> -Rui
>>
>> On Fri, Feb 14, 2020 at 3:14 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Do you mean they were flattened before by calcite or Does beam flatten
>>> them too ?
>>>
>>>
>>>
>>> On Fri, Feb 14, 2020 at 1:21 PM Rui Wang  wrote:
>>>
 Nested row types might be less well supported (r.g. Row) because they
 were flattened before anyway.


 -Rui

 On Fri, Feb 14, 2020 at 12:14 PM Talat Uyarer <
 tuya...@paloaltonetworks.com> wrote:

> Thank you for your response.
> I saw it and applied patch on calcite 1.20. However I realized
> BeamCalRel does not generate right code [1]to turn back Beam types. I am
> working on that now. Please let me know if apache beam support nested row
> types but I miss it.
>
>
> [1]
> https://github.com/apache/beam/blob/646f596988be9d6a739090f48d2fed07c8dfc17c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L167
> 
>
> On Fri, Feb 14, 2020 at 10:33 AM Rui Wang  wrote:
>
>> Calcite has improved to reconstruct ROW back in the output. See [1].
>> Beam need to update Calcite dependency to > 1.21 to adopt that.
>>
>>
>>
>> [1]: https://jira.apache.org/jira/browse/CALCITE-3138
>> 
>>
>>
>> -Rui
>>
>> On Thu, Feb 13, 2020 at 9:05 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to Beam SQL. But something is wrong. I have nested row
>>> records. I read them as Pcollection and apply Select * query and
>>> compare with initial rows. Looks like nested rows are flatten by 
>>> calcite.
>>> How do you have any idea how can I avoid this?
>>>
>>> I added a same testcase for my issue:
>>>
>>> Schema nestedSchema =
>>> Schema.builder()
>>> .addInt32Field("f_nestedInt")
>>> .addStringField("f_nestedString")
>>> .addInt32Field("f_nestedIntPlusOne")
>>> .build();
>>> Schema inputType =
>>> Schema.builder().addInt32Field("f_int").addRowField("f_row", 
>>> nestedSchema).build();
>>>
>>> PCollection input =
>>> pipeline.apply(
>>> Create.of(
>>> Row.withSchema(inputType)
>>> .addValues(
>>> 1, Row.withSchema(nestedSchema).addValues(312, 
>>> "CC", 313).build())
>>> .build())
>>> .withRowSchema(inputType))
>>> .setRowSchema(inputType);
>>>
>>> PCollection result =
>>> input
>>> .apply(
>>> SqlTransform.query(
>>> "SELECT * FROM PCOLLECTION"));
>>>
>>> PAssert.that(result)
>>> .containsInAnyOrder(Row.withSchema(inputType)
>>> .addValues(
>>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>> 313).build())
>>> .build());
>>>
>>>
>>> Thank you so much in advance.
>>>
>>>


Re: Beam SQL Nested Rows are flatten by Calcite

2020-02-24 Thread Talat Uyarer
Hi Rui,

I solved the issue. After 1.21 version they are not getting flattened in
LogicalPlan.

Thanks for your help. I am going to create a patch for it.

Talat

On Sat, Feb 15, 2020 at 6:26 PM Rui Wang  wrote:

> Because Calcite flattened Row so BeamSQL didn't need to deal with nested
> Row structure (as they were flattened in LogicalPlan).
>
> Depends on how that patch works. Nested row might not immediately work
> after you apply that patch.
>
>
> -Rui
>
> On Fri, Feb 14, 2020 at 3:14 PM Talat Uyarer 
> wrote:
>
>> Do you mean they were flattened before by calcite or Does beam flatten
>> them too ?
>>
>>
>>
>> On Fri, Feb 14, 2020 at 1:21 PM Rui Wang  wrote:
>>
>>> Nested row types might be less well supported (r.g. Row) because they
>>> were flattened before anyway.
>>>
>>>
>>> -Rui
>>>
>>> On Fri, Feb 14, 2020 at 12:14 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
 Thank you for your response.
 I saw it and applied patch on calcite 1.20. However I realized
 BeamCalRel does not generate right code [1]to turn back Beam types. I am
 working on that now. Please let me know if apache beam support nested row
 types but I miss it.


 [1]
 https://github.com/apache/beam/blob/646f596988be9d6a739090f48d2fed07c8dfc17c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L167
 

 On Fri, Feb 14, 2020 at 10:33 AM Rui Wang  wrote:

> Calcite has improved to reconstruct ROW back in the output. See [1].
> Beam need to update Calcite dependency to > 1.21 to adopt that.
>
>
>
> [1]: https://jira.apache.org/jira/browse/CALCITE-3138
> 
>
>
> -Rui
>
> On Thu, Feb 13, 2020 at 9:05 PM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
>
>> Hi,
>>
>> I am trying to Beam SQL. But something is wrong. I have nested row
>> records. I read them as Pcollection and apply Select * query and
>> compare with initial rows. Looks like nested rows are flatten by calcite.
>> How do you have any idea how can I avoid this?
>>
>> I added a same testcase for my issue:
>>
>> Schema nestedSchema =
>> Schema.builder()
>> .addInt32Field("f_nestedInt")
>> .addStringField("f_nestedString")
>> .addInt32Field("f_nestedIntPlusOne")
>> .build();
>> Schema inputType =
>> Schema.builder().addInt32Field("f_int").addRowField("f_row", 
>> nestedSchema).build();
>>
>> PCollection input =
>> pipeline.apply(
>> Create.of(
>> Row.withSchema(inputType)
>> .addValues(
>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>> 313).build())
>> .build())
>> .withRowSchema(inputType))
>> .setRowSchema(inputType);
>>
>> PCollection result =
>> input
>> .apply(
>> SqlTransform.query(
>> "SELECT * FROM PCOLLECTION"));
>>
>> PAssert.that(result)
>> .containsInAnyOrder(Row.withSchema(inputType)
>> .addValues(
>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>> 313).build())
>> .build());
>>
>>
>> Thank you so much in advance.
>>
>>


Re: Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-02-24 Thread Maximilian Michels

Thank you for reporting / filing / collecting the issues.

There is a fix pending: https://github.com/apache/beam/pull/10950

As for the upgrade issues, the 1.8 and 1.9 upgrade is trivial. I will 
check out the Flink 1.10 PR tomorrow.


Cheers,
Max

On 24.02.20 09:26, Ismaël Mejía wrote:
We are cutting the release branch for 2.20.0 next wednesday, so not sure 
if these tickets will make it, but hopefully.


For ref,
BEAM-9295 Add Flink 1.10 build target and Make FlinkRunner compatible 
with Flink 1.10

BEAM-9299 Upgrade Flink Runner to 1.8.3 and 1.9.2

In any case if you have cycles to help test any of the related tickets  
PRs that would help too.



On Mon, Feb 24, 2020 at 8:47 AM Kaymak, Tobias > wrote:


Hi Kyle,

thank you for creating the JIRA ticket, I think my best option right
now is to wait for a Beam version that is running on Flink 1.10 then
- unless there is a new Beam release around the corner :)

Best,
Tobi

On Thu, Feb 20, 2020 at 11:52 PM Kyle Weaver mailto:kcwea...@google.com>> wrote:

Hi Tobi,

This seems like a bug with Beam 2.19. I filed
https://issues.apache.org/jira/browse/BEAM-9345 to track the issue.

 > What puzzles me is that the session cluster should be allowed
to have multiple environments in detached mode - or am I wrong?

It looks like that check is removed in Flink 1.10:
https://issues.apache.org/jira/browse/FLINK-15201

Thanks for reporting.
Kyle

On Thu, Feb 20, 2020 at 4:10 AM Kaymak, Tobias
mailto:tobias.kay...@ricardo.ch>> wrote:

Hello,

I am trying to upgrade from a Flink session cluster 1.8 to
1.9 and from Beam 2.16.0 to 2.19.0.
Everything went quite smoothly, the local runner and the
local Flink runner work flawlessly.

However when I:
   1. Generate a Beam jar for the FlinkRunner via maven (mvn
package -PFlinkRunner)
   2. Glue that into a Flink 1.9 docker image
   3. Start the image as a Standalone Session Cluster

When I try to launch the first pipeline I get the following
exception

org.apache.flink.client.program.ProgramInvocationException:
The main method caused an error: Failed to construct
instance from factory method
FlinkRunner#fromOptions(interface
org.apache.beam.sdk.options.PipelineOptions)
         at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
         at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
         at

org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
         at

org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
         at

org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
         at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
         at

org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
         at

org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
         at

org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
         at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Failed to construct
instance from factory method
FlinkRunner#fromOptions(interface
org.apache.beam.sdk.options.PipelineOptions)
         at

org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
         at

org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
         at

org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
         at
ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:180)
         at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at


Re: Unbounded input join Unbounded input then write to Bounded Sink

2020-02-24 Thread Rui Wang
SQL does not support such joins with your requirement: write to sink after
every 1 min after window closes.

You might can use state and timer API to achieve your goal.



-Rui

On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram 
wrote:

> Hi,
>
> I am trying to join inputs from Unbounded Sources then write to Bounded
> Sink.
> The pipeline I'm trying is:
> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>  And, a FixedWindow of 1 minute duration is applied.
>
> I'm expecting the inputs from unbounded sources joined within the current
> window to be written to the HCatalogIO Sink after every 1 min i.e after
> each window interval.
>
> Can someone please tell if this is a valid scenario and what is the
> expected behaviour from this kind of scenario?
>
> Regards,
> Shanta
>


Unbounded input join Unbounded input then write to Bounded Sink

2020-02-24 Thread shanta chakpram
Hi,

I am trying to join inputs from Unbounded Sources then write to Bounded
Sink.
The pipeline I'm trying is:
Kafka Sources -> SqlTransform -> HCatalogIO  Sink
 And, a FixedWindow of 1 minute duration is applied.

I'm expecting the inputs from unbounded sources joined within the current
window to be written to the HCatalogIO Sink after every 1 min i.e after
each window interval.

Can someone please tell if this is a valid scenario and what is the
expected behaviour from this kind of scenario?

Regards,
Shanta


Re: Bay Area Beam Meetup 19 Feb (Last Wednesday).

2020-02-24 Thread Ahmet Altay
Thank you for sharing talks!

On Fri, Feb 21, 2020 at 9:18 PM Austin Bennett 
wrote:

> Hi All,
>
> We had a meetup @Sentry.io on Wednesday -- with a solid 40+ engaged
> attendees.
>
> Thanks for those that joined in person, and for those that were unable,
> talks can be found online -->
> Syd's talk (real time data warehouse): https://youtu.be/rFK6drAWN40
> Mike's talk (beam in production): https://youtu.be/GOQVTr5hBoQ
>
> Cheers,
> Austin
>
>
> P.S.  The event page for more info
> https://www.meetup.com/San-Francisco-Apache-Beam/events/268363008/
>
>
>


Re: Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-02-24 Thread Ismaël Mejía
We are cutting the release branch for 2.20.0 next wednesday, so not sure if
these tickets will make it, but hopefully.

For ref,
BEAM-9295 Add Flink 1.10 build target and Make FlinkRunner compatible with
Flink 1.10
BEAM-9299 Upgrade Flink Runner to 1.8.3 and 1.9.2

In any case if you have cycles to help test any of the related tickets  PRs
that would help too.


On Mon, Feb 24, 2020 at 8:47 AM Kaymak, Tobias 
wrote:

> Hi Kyle,
>
> thank you for creating the JIRA ticket, I think my best option right now
> is to wait for a Beam version that is running on Flink 1.10 then - unless
> there is a new Beam release around the corner :)
>
> Best,
> Tobi
>
> On Thu, Feb 20, 2020 at 11:52 PM Kyle Weaver  wrote:
>
>> Hi Tobi,
>>
>> This seems like a bug with Beam 2.19. I filed
>> https://issues.apache.org/jira/browse/BEAM-9345 to track the issue.
>>
>> > What puzzles me is that the session cluster should be allowed to have
>> multiple environments in detached mode - or am I wrong?
>>
>> It looks like that check is removed in Flink 1.10:
>> https://issues.apache.org/jira/browse/FLINK-15201
>>
>> Thanks for reporting.
>> Kyle
>>
>> On Thu, Feb 20, 2020 at 4:10 AM Kaymak, Tobias 
>> wrote:
>>
>>> Hello,
>>>
>>> I am trying to upgrade from a Flink session cluster 1.8 to 1.9 and from
>>> Beam 2.16.0 to 2.19.0.
>>> Everything went quite smoothly, the local runner and the local Flink
>>> runner work flawlessly.
>>>
>>> However when I:
>>>   1. Generate a Beam jar for the FlinkRunner via maven (mvn package
>>> -PFlinkRunner)
>>>   2. Glue that into a Flink 1.9 docker image
>>>   3. Start the image as a Standalone Session Cluster
>>>
>>> When I try to launch the first pipeline I get the following exception
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Failed to construct instance from factory method
>>> FlinkRunner#fromOptions(interface
>>> org.apache.beam.sdk.options.PipelineOptions)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>>> Caused by: java.lang.RuntimeException: Failed to construct instance from
>>> factory method FlinkRunner#fromOptions(interface
>>> org.apache.beam.sdk.options.PipelineOptions)
>>> at
>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
>>> at
>>> org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
>>> at
>>> org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
>>> at
>>> ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:180)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>> ... 9 more
>>> Caused by: java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
>>> ... 19 more
>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Multiple
>>> environments cannot be created in detached mode
>>> at
>>> org.apache.flink.client.program.ContextEnvironmentFactory.createExecutionEnvironment(ContextEnvironmentFactory.java:67)
>>> at java.util.Optional.map(Optional.java:215)
>>> at
>>>