Re: Bigtable for BeamSQL - question about the schema design

2020-11-16 Thread Piotr Szuberski


> >Is there a jira for this issue?

Sorry for the delay. Luckily Rui answered it better than I would.


> https://issues.apache.org/jira/browse/BEAM-10896 is the one that I am aware
> of. Though it says to aim to improve UNNEST, I think it could improve
> ARRAY in general.  Also like Kenneth mentioned, it might depend on
> vendored Calcite upgrade to at least 1.23.0

It sounds like it's exactly what is needed!



Re: Getting ClassCastException

2020-11-16 Thread Sonam Ramchand
Here you are,

org.apache.beam.sdk.extensions.sql.zetasql.translation.SqlOperators$1
cannot be cast to
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlAggFunction
java.lang.ClassCastException:
org.apache.beam.sdk.extensions.sql.zetasql.translation.SqlOperators$1
cannot be cast to
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlAggFunction
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convertAggCall(AggregateScanConverter.java:202)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:94)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:50)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:99)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Collections$2.tryAdvance(Collections.java:4719)
at java.util.Collections$2.forEachRemaining(Collections.java:4727)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:98)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convert(QueryStatementConverter.java:86)
at
org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertRootQuery(QueryStatementConverter.java:52)
at
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:140)
at
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:168)
at
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:156)
at
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:140)
at
org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlDialectSpecTest.testLogicalAndZetaSQL(ZetaSqlDialectSpecTest.java:4334)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
at
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:266)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at 

Website Revamp Update – Week 3 & 4

2020-11-16 Thread Griselda Cuevas
Hi Beam Community!

Sprint 3 and 4 in the website revamp project have concluded and below is a
recap of the work the team has done on it. The working notes can be found
here [1] and the presentations from the past two meetings here [2]. The PRD
has been split in the requirements doc and a Technical Design Doc that
contains discussions on designs and copy, you can find the PRD here [3] and
the TDD here [4].

Lastly, remember you can join the weekly calls if you're interested in
discussing any changes. Let us know in this thread if you'd like to attend,
meetings happen on Wednesdays at 8a.m. Pacific Time.

Sprint 3 and 4 Recap

   - Join the community page: pathway was finalized and copy was added to
   the TDD
   - Contribute page changes: “Get started contributing” and “Become a
   committer” will be moved to the higher level in the page architecture. We
   keep “Committer guide”, “Release guide”, and “Get help” in Commiters.
   - Icons to illustrate the Beam value proposition were finalized, icons
   for (Unified, Portable, extensible, and Value of the community)
   - "Powered by" page will be re-designed to include use cases from Beam
   Summit. BEAM-11225 is tracking permissions and requests to presenters.
   - Beam model illustration: Brainstormed with community members on
   changes needed and key concepts to illustrate
   - Copy for the following pages is needed: Welcome to the Beam Community,
   Contact Us, Beam Community Practices, Become a Commiter, Contribution Guide
   and Documentation Overview. Jiras to be created.

Next Sprint:

   - Finalize details on newly design pages
   - Copy for newly design pages
   - Production will start


[1]
https://docs.google.com/document/d/1CqssBpRt1EjpV0nCBaS9WGaGHt_4stKXbwMz9gYfmwc/edit
[2]
https://docs.google.com/presentation/d/1ay8eKFEi8rz18uUMDqDoBP6gEtYJTo34VZr1EXjqIZM/edit?usp=sharing

[3] https://s.apache.org/beam-site-revamp
[4] https://s.apache.org/beam-website-revamp-tdd


Re: PTransform Annotations Proposal

2020-11-16 Thread Robert Bradshaw
I agree things like GPU, high-mem, etc. belong to the environment. If
annotations are truly advisory, one can imagine merging environments
by taking the union of annotations and still producing a correct
pipeline. (This would mean that annotations would have to be a
multi-map...)

On the other hand, this doesn't seem to handle the case of privacy
analysis, which could apply to composites without applying to any
individual component, and don't really make sense as part of a
fusion/execution story.

On Mon, Nov 16, 2020 at 4:00 PM Robert Burke  wrote:
>
> That's good historical context.
>
> But then we'd still need to codify the annotation would need to be optional, 
> and not affect correctness.
>
> Conflicts become easier to manage, (as environments with conflicting 
> annotations simply don't get merged, and stay as distinct environments) but 
> are still notionally annotation dependant. Do most runners handle 
> environments so individually though?
>
> Reuven's question is a good one though. For the Go SDK, and the proposed 
> implementation i saw, they only applied to leaf nodes. This is an artifact of 
> how the Go SDK handles composites. Nothing stops it from being implemented on 
> the composites Go has, but it didn't make sense otherwise. AFAICT Composites 
> are generally for organizational convenience and not for functional aspects. 
> Is this wrong? Afterall, does it make sense for environments to be on leaves 
> and composites either? It's the same issue there.
>
>
> On Mon, Nov 16, 2020, 3:45 PM Kenneth Knowles  wrote:
>>
>> I am +1 to the proposal but believe it should be moved to the Environment. I 
>> could be convinced otherwise, but would want to really understand the 
>> details.
>>
>> I think we haven't done a great job communicating the purpose of the 
>> Environment proto. It was explicitly created for this purpose.
>>
>> 1. It tells the runner things it needs to know to interpret the DoFn (or 
>> other UDF). So these are the existing proto fields like docker image (in the 
>> payload) and required artifacts that were staged.
>> 2. It is also the place for additional requirements or hints like "high mem" 
>> or "GPU" etc.
>>
>> Every user function has an associated environment, and environments exist 
>> only for the purpose of executing user functions. In fact, Environment 
>> originated as inline requirements/attributes for a user function proto 
>> message and was separated just to make the proto smaller.
>>
>> A PTransform is an abstract concept for organizing the graph, not an 
>> executable thing. So a hint/capability/requirement on a PTransform only 
>> really makes sense as a scoping mechanism for applying a hint to a bunch of 
>> functions within a subgraph. This seems like a user interface concern and 
>> the SDK should own propagating the hints. If the hint truly applies to the 
>> whole PTransform and *not* the parts, then I am interested in learning about 
>> that.
>>
>> Kenn
>>
>> On Mon, Nov 16, 2020 at 10:54 AM Robert Burke  wrote:
>>>
>>> That's a good question.
>>>
>>> I think the main difference is a matter of scope. Annotations would apply 
>>> to a PTransform while an environment applies to sets of transforms. A 
>>> difference is the optional nature of the annotations they don't affect 
>>> correctness. Runners don't need to do anything with them and still execute 
>>> the pipeline correctly.
>>>
>>> Consider a privacy analysis on a pipeline graph. An annotation indicating 
>>> that a transform provides a certain level of anonymization can be used in 
>>> an analysis to determine if the downstream transforms are encountering raw 
>>> data or not.
>>>
>>> From my understanding (which can be wrong) environments are rigid. 
>>> Transforms in different environments can't be fused. "This is the python 
>>> env", "this is the java env" can't be merged together. It's not clear to me 
>>> that we have defined when environments are safely fuseable outside of 
>>> equality. There's value in that simplicity.
>>>
>>> AFIACT environment has less to do with the machines a pipeline is executing 
>>> on than it does about the kinds of SDK pipelines it understands and can 
>>> execute.
>>>
>>>
>>>
>>> On Mon, Nov 16, 2020, 10:36 AM Chad Dombrova  wrote:
>
>
> Another example of an optional annotation is marking a transform to run 
> on secure hardware, or to give hints to profiling/dynamic analysis tools.


 There seems to be a lot of overlap between this idea and Environments.  
 Can you talk about how you feel they may be different or related?  For 
 example, I could see annotations as a way of tagging transforms with an 
 Environment, or I could see Environments becoming a specialized form of 
 annotation.

 -chad



Re: PTransform Annotations Proposal

2020-11-16 Thread Robert Burke
That's good historical context.

But then we'd still need to codify the annotation would need to be
optional, and not affect correctness.

Conflicts become easier to manage, (as environments with conflicting
annotations simply don't get merged, and stay as distinct environments) but
are still notionally annotation dependant. Do most runners handle
environments so individually though?

Reuven's question is a good one though. For the Go SDK, and the proposed
implementation i saw, they only applied to leaf nodes. This is an artifact
of how the Go SDK handles composites. Nothing stops it from being
implemented on the composites Go has, but it didn't make sense otherwise.
AFAICT Composites are generally for organizational convenience and not for
functional aspects. Is this wrong? Afterall, does it make sense for
environments to be on leaves and composites either? It's the same issue
there.


On Mon, Nov 16, 2020, 3:45 PM Kenneth Knowles  wrote:

> I am +1 to the proposal but believe it should be moved to the Environment.
> I could be convinced otherwise, but would want to really understand the
> details.
>
> I think we haven't done a great job communicating the purpose of the
> Environment proto. It was explicitly created for this purpose.
>
> 1. It tells the runner things it needs to know to interpret the DoFn (or
> other UDF). So these are the existing proto fields like docker image (in
> the payload) and required artifacts that were staged.
> 2. It is also the place for additional requirements or hints like "high
> mem" or "GPU" etc.
>
> Every user function has an associated environment, and environments exist
> only for the purpose of executing user functions. In fact, Environment
> originated as inline requirements/attributes for a user function proto
> message and was separated just to make the proto smaller.
>
> A PTransform is an abstract concept for organizing the graph, not an
> executable thing. So a hint/capability/requirement on a PTransform only
> really makes sense as a scoping mechanism for applying a hint to a bunch of
> functions within a subgraph. This seems like a user interface concern and
> the SDK should own propagating the hints. If the hint truly applies to the
> whole PTransform and *not* the parts, then I am interested in learning
> about that.
>
> Kenn
>
> On Mon, Nov 16, 2020 at 10:54 AM Robert Burke  wrote:
>
>> That's a good question.
>>
>> I think the main difference is a matter of scope. Annotations would apply
>> to a PTransform while an environment applies to sets of transforms. A
>> difference is the optional nature of the annotations they don't affect
>> correctness. Runners don't need to do anything with them and still execute
>> the pipeline correctly.
>>
>> Consider a privacy analysis on a pipeline graph. An annotation indicating
>> that a transform provides a certain level of anonymization can be used in
>> an analysis to determine if the downstream transforms are encountering raw
>> data or not.
>>
>> From my understanding (which can be wrong) environments are rigid.
>> Transforms in different environments can't be fused. "This is the python
>> env", "this is the java env" can't be merged together. It's not clear to me
>> that we have defined when environments are safely fuseable outside of
>> equality. There's value in that simplicity.
>>
>> AFIACT environment has less to do with the machines a pipeline is
>> executing on than it does about the kinds of SDK pipelines it understands
>> and can execute.
>>
>>
>>
>> On Mon, Nov 16, 2020, 10:36 AM Chad Dombrova  wrote:
>>
>>>
 Another example of an optional annotation is marking a transform to run
 on secure hardware, or to give hints to profiling/dynamic analysis tools.

>>>
>>> There seems to be a lot of overlap between this idea and Environments.
>>> Can you talk about how you feel they may be different or related?  For
>>> example, I could see annotations as a way of tagging transforms with an
>>> Environment, or I could see Environments becoming a specialized form of
>>> annotation.
>>>
>>> -chad
>>>
>>>


Re: PTransform Annotations Proposal

2020-11-16 Thread Kenneth Knowles
I am +1 to the proposal but believe it should be moved to the Environment.
I could be convinced otherwise, but would want to really understand the
details.

I think we haven't done a great job communicating the purpose of the
Environment proto. It was explicitly created for this purpose.

1. It tells the runner things it needs to know to interpret the DoFn (or
other UDF). So these are the existing proto fields like docker image (in
the payload) and required artifacts that were staged.
2. It is also the place for additional requirements or hints like "high
mem" or "GPU" etc.

Every user function has an associated environment, and environments exist
only for the purpose of executing user functions. In fact, Environment
originated as inline requirements/attributes for a user function proto
message and was separated just to make the proto smaller.

A PTransform is an abstract concept for organizing the graph, not an
executable thing. So a hint/capability/requirement on a PTransform only
really makes sense as a scoping mechanism for applying a hint to a bunch of
functions within a subgraph. This seems like a user interface concern and
the SDK should own propagating the hints. If the hint truly applies to the
whole PTransform and *not* the parts, then I am interested in learning
about that.

Kenn

On Mon, Nov 16, 2020 at 10:54 AM Robert Burke  wrote:

> That's a good question.
>
> I think the main difference is a matter of scope. Annotations would apply
> to a PTransform while an environment applies to sets of transforms. A
> difference is the optional nature of the annotations they don't affect
> correctness. Runners don't need to do anything with them and still execute
> the pipeline correctly.
>
> Consider a privacy analysis on a pipeline graph. An annotation indicating
> that a transform provides a certain level of anonymization can be used in
> an analysis to determine if the downstream transforms are encountering raw
> data or not.
>
> From my understanding (which can be wrong) environments are rigid.
> Transforms in different environments can't be fused. "This is the python
> env", "this is the java env" can't be merged together. It's not clear to me
> that we have defined when environments are safely fuseable outside of
> equality. There's value in that simplicity.
>
> AFIACT environment has less to do with the machines a pipeline is
> executing on than it does about the kinds of SDK pipelines it understands
> and can execute.
>
>
>
> On Mon, Nov 16, 2020, 10:36 AM Chad Dombrova  wrote:
>
>>
>>> Another example of an optional annotation is marking a transform to run
>>> on secure hardware, or to give hints to profiling/dynamic analysis tools.
>>>
>>
>> There seems to be a lot of overlap between this idea and Environments.
>> Can you talk about how you feel they may be different or related?  For
>> example, I could see annotations as a way of tagging transforms with an
>> Environment, or I could see Environments becoming a specialized form of
>> annotation.
>>
>> -chad
>>
>>


Re: Bigtable for BeamSQL - question about the schema design

2020-11-16 Thread Rui Wang
On Tue, Nov 10, 2020 at 10:25 AM Brian Hulette  wrote:

>
>
> On Tue, Nov 10, 2020 at 5:46 AM Piotr Szuberski <
> piotr.szuber...@polidea.com> wrote:
>
>> Unfortunately according to the documentation, BeamSQL doesn't work well
>> with ARRAY, like ARRAY> which I confirmed empirically.
>>
>>
> Is there a jira for this issue?
>

https://issues.apache.org/jira/browse/BEAM-10896 is the one that I am aware
of. Though it says to aim to improve UNNEST, I think it could improve
ARRAY in general.  Also like Kenneth mentioned, it might depend on
vendored Calcite upgrade to at least 1.23.0


>
>
>> The only way to retrieve array's values was to get it by index, e.g.
>> SELECT t.complex_array[1].row_field from some_table t;
>>
>> Unnest and just taking an array doesn't work with ARRAY.
>>
>> I think that if a user wants to have a list of cells then it has to be
>> truncated to "value" only and just the recent cell if it has to be ROW with
>> timestamp, labels etc.
>>
>> This limitation doesn't happen in the flattened rows, but another one
>> takes place - the value has to be of BINARY type and then parsed by user.
>>
>> I'll try to make it possibly elastic - ARRAY limitation makes it
>> much less straightforward.
>>
>> Thanks for the references! As I understand Flink cares only for the
>> recent values of the column and ignores the timestamps and labels? I refer
>> to this:
>> family1 ROW  -- family1.q1 is the most recent cell value of the
>> q1 column?
>>
>
> I can't find a reference to confirm, but that seems like the most likely
> explanation. Similar to how BigQuery behaves with the onlyReadLatest option.
>
>
>>
>> On 2020/11/06 20:43:03, Ismaël Mejía  wrote:
>> > Thanks for the references Rui. I think it is worth to consider how
>> > open source systems do it.
>> > The great thing about this is that we could 'easily' map Piotr's work
>> > for Bigtable to HBase too once it is done.
>
> >
>> > On Fri, Nov 6, 2020 at 8:22 PM Rui Wang  wrote:
>> > >
>> > > Another two references are from how Flink and Spark uses HBase by SQL:
>>
>
> Great point! I forgot that HBase is modeled after BigTable
>
>
>> > >
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hbase.html
>> > > https://stackoverflow.com/questions/39530938/sparksql-on-hbase-tables
>> > >
>> > > -Rui
>> > >
>> > > On Thu, Nov 5, 2020 at 9:46 AM Piotr Szuberski <
>> piotr.szuber...@polidea.com> wrote:
>> > >>
>> > >> Thanks for the resources! I'll try to follow the BQ approach. I'd
>> also add something like flattened schema so the user can use simple types
>> only. It would be limited to BINARY values though. Something like:
>> > >> CREATE EXTERNAL TABLE(
>> > >>   key VARCHAR NOT NULL,
>> > >>   family VARCHAR NOT NULL,
>> > >>   column VARCHAR NOT NULL,
>> > >>   value BINARY NOT NULL,
>> > >>   timestampMicros BIGINT NOT NULL
>> > >> )
>> > >>  The cells array would be flattened (denormalized) and easy to use.
>> In case of single-valued cells it would also be quite efficient.
>> > >>
>> > >> On 2020/11/05 00:22:44, Brian Hulette  wrote:
>> > >> > I think we should take a look at how BigTable is integrated with
>> other SQL
>> > >> > systems. For example we could get some inspiration from BigQuery's
>> support
>> > >> > for querying BigTable data [1]. It looks like by default it uses
>> something
>> > >> > like (1), but they recognize this is difficult to process with
>> SQL, so they
>> > >> > have an option you can set to elevate certain columns as
>> sub-fields (more
>> > >> > like (2)), and you can also indicate you only want to get the
>> latest value
>> > >> > for each column.
>> > >> >
>> > >> > In any case this may be a good candidate for not requiring the
>> user to
>> > >> > actually specify a schema, and instead letting the table be fully
>> > >> > determined by options.
>> > >> >
>> > >> > [1] https://cloud.google.com/bigquery/external-data-bigtable
>> > >> >
>> > >> > On Tue, Nov 3, 2020 at 11:41 PM Piotr Szuberski <
>> piotr.szuber...@polidea.com>
>> > >> > wrote:
>> > >> >
>> > >> > > I've dug the topic a bit and I think the 2nd approach will fit
>> better. The
>> > >> > > schema in Bigtable is not supposed to change that often and
>> specifying our
>> > >> > > own schema is more SQL-like and will cause less potential
>> trouble.
>> > >> > >
>> > >> > > On 2020/11/03 11:01:57, Piotr Szuberski <
>> piotr.szuber...@polidea.com>
>> > >> > > wrote:
>> > >> > > > I'm going to write Bigtable table for BeamSQL and I have a
>> question
>> > >> > > about the schema design, which one would be preferrable.
>> > >> > > >
>> > >> > > > Bigtable stores its data in a table with rows that contain a
>> key and
>> > >> > > 3-dimensional array where the 1st dimension is families with a
>> names, 2nd
>> > >> > > dimension is columns with qualifiers and the 3rd cells
>> containing timestamp
>> > >> > > and value.
>> > >> > > >
>> > >> > > > Two design solutions come to mind:
>> > >> > > > 1) Fix schema to be a generic Bigtable 

Re: Bigtable for BeamSQL - question about the schema design

2020-11-16 Thread Kenneth Knowles
If I recall correctly, we need to upgrade Calcite for this.

On Tue, Nov 10, 2020 at 10:24 AM Brian Hulette  wrote:

>
>
> On Tue, Nov 10, 2020 at 5:46 AM Piotr Szuberski <
> piotr.szuber...@polidea.com> wrote:
>
>> Unfortunately according to the documentation, BeamSQL doesn't work well
>> with ARRAY, like ARRAY> which I confirmed empirically.
>>
>>
> Is there a jira for this issue?
>
>
>> The only way to retrieve array's values was to get it by index, e.g.
>> SELECT t.complex_array[1].row_field from some_table t;
>>
>> Unnest and just taking an array doesn't work with ARRAY.
>>
>> I think that if a user wants to have a list of cells then it has to be
>> truncated to "value" only and just the recent cell if it has to be ROW with
>> timestamp, labels etc.
>>
>> This limitation doesn't happen in the flattened rows, but another one
>> takes place - the value has to be of BINARY type and then parsed by user.
>>
>> I'll try to make it possibly elastic - ARRAY limitation makes it
>> much less straightforward.
>>
>> Thanks for the references! As I understand Flink cares only for the
>> recent values of the column and ignores the timestamps and labels? I refer
>> to this:
>> family1 ROW  -- family1.q1 is the most recent cell value of the
>> q1 column?
>>
>
> I can't find a reference to confirm, but that seems like the most likely
> explanation. Similar to how BigQuery behaves with the onlyReadLatest option.
>
>
>>
>> On 2020/11/06 20:43:03, Ismaël Mejía  wrote:
>> > Thanks for the references Rui. I think it is worth to consider how
>> > open source systems do it.
>> > The great thing about this is that we could 'easily' map Piotr's work
>> > for Bigtable to HBase too once it is done.
>
> >
>> > On Fri, Nov 6, 2020 at 8:22 PM Rui Wang  wrote:
>> > >
>> > > Another two references are from how Flink and Spark uses HBase by SQL:
>>
>
> Great point! I forgot that HBase is modeled after BigTable
>
>
>> > >
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hbase.html
>> > > https://stackoverflow.com/questions/39530938/sparksql-on-hbase-tables
>> > >
>> > > -Rui
>> > >
>> > > On Thu, Nov 5, 2020 at 9:46 AM Piotr Szuberski <
>> piotr.szuber...@polidea.com> wrote:
>> > >>
>> > >> Thanks for the resources! I'll try to follow the BQ approach. I'd
>> also add something like flattened schema so the user can use simple types
>> only. It would be limited to BINARY values though. Something like:
>> > >> CREATE EXTERNAL TABLE(
>> > >>   key VARCHAR NOT NULL,
>> > >>   family VARCHAR NOT NULL,
>> > >>   column VARCHAR NOT NULL,
>> > >>   value BINARY NOT NULL,
>> > >>   timestampMicros BIGINT NOT NULL
>> > >> )
>> > >>  The cells array would be flattened (denormalized) and easy to use.
>> In case of single-valued cells it would also be quite efficient.
>> > >>
>> > >> On 2020/11/05 00:22:44, Brian Hulette  wrote:
>> > >> > I think we should take a look at how BigTable is integrated with
>> other SQL
>> > >> > systems. For example we could get some inspiration from BigQuery's
>> support
>> > >> > for querying BigTable data [1]. It looks like by default it uses
>> something
>> > >> > like (1), but they recognize this is difficult to process with
>> SQL, so they
>> > >> > have an option you can set to elevate certain columns as
>> sub-fields (more
>> > >> > like (2)), and you can also indicate you only want to get the
>> latest value
>> > >> > for each column.
>> > >> >
>> > >> > In any case this may be a good candidate for not requiring the
>> user to
>> > >> > actually specify a schema, and instead letting the table be fully
>> > >> > determined by options.
>> > >> >
>> > >> > [1] https://cloud.google.com/bigquery/external-data-bigtable
>> > >> >
>> > >> > On Tue, Nov 3, 2020 at 11:41 PM Piotr Szuberski <
>> piotr.szuber...@polidea.com>
>> > >> > wrote:
>> > >> >
>> > >> > > I've dug the topic a bit and I think the 2nd approach will fit
>> better. The
>> > >> > > schema in Bigtable is not supposed to change that often and
>> specifying our
>> > >> > > own schema is more SQL-like and will cause less potential
>> trouble.
>> > >> > >
>> > >> > > On 2020/11/03 11:01:57, Piotr Szuberski <
>> piotr.szuber...@polidea.com>
>> > >> > > wrote:
>> > >> > > > I'm going to write Bigtable table for BeamSQL and I have a
>> question
>> > >> > > about the schema design, which one would be preferrable.
>> > >> > > >
>> > >> > > > Bigtable stores its data in a table with rows that contain a
>> key and
>> > >> > > 3-dimensional array where the 1st dimension is families with a
>> names, 2nd
>> > >> > > dimension is columns with qualifiers and the 3rd cells
>> containing timestamp
>> > >> > > and value.
>> > >> > > >
>> > >> > > > Two design solutions come to mind:
>> > >> > > > 1) Fix schema to be a generic Bigtable row:
>> > >> > > >
>> > >> > > > Row(key, Array(Row(family, Array(Row(qualifier,
>> Array(Row(value,
>> > >> > > timestamp)))
>> > >> > > >
>> > >> > > > Then the table creation definition would 

Re: PTransform Annotations Proposal

2020-11-16 Thread Jan Lukavský
Minor correction, the CoGBK broadcast vs. full shuffle is probably not 
ideal example, because it still requires grouping the larger PCollection 
(if not already grouped). If we take Join PTransform that acts on 
cartesian product of these groups, then it works well.


Jan

On 11/16/20 8:39 PM, Jan Lukavský wrote:


Hi,

could this proposal be generalized to annotations of PCollections as 
well? Maybe that reduces to several types of annotations of a 
PTransform - e.g.


 a) runtime annotations of a PTransform (that might be scheduling 
hints - i.e. schedule this task to nodes with GPUs, etc.)


 b) output annotations - i.e. annotations that actually apply to 
PCollections, as every PCollection has at most one producer (this is 
what have been actually discussed in the referenced mailing list threads)


It would be cool, if this added option to do PTransform expansions 
based on annotations of input PCollections. We tried to play with this 
in Euphoria DSL, but it turned out it would be best fitted in Beam SDK.


Example of input annotation sensitive expansion might be CoGBK, when 
one side is annotated i.e. FitsInMemoryPerWindow (or SmallPerWindow, 
or whatever), then CoGBK might be expanded using broadcast instead of 
full shuffle.


Absolutely agree that all this must not have anything to do with 
semantics and correctness, thus might be safely ignored, and that 
might answer the last question of @Reuven, when there are conflicting 
annotations, it would be possible to simple drop them as a last resort.


Jan

On 11/16/20 8:13 PM, Robert Burke wrote:
I imagine it has everything to do with the specific annotation to 
define that.


The runner notionally doesn't need to do anything with them, as they 
are optional, and not required for correctness.


On Mon, Nov 16, 2020, 10:56 AM Reuven Lax > wrote:


PTransforms are hierarchical - namely a PTransform contains other
PTransforms, and so on. Is the runner expected to resolve all
annotations down to leaf nodes? What happens if that results in
conflicting annotations?

On Mon, Nov 16, 2020 at 10:54 AM Robert Burke mailto:rob...@frantil.com>> wrote:

That's a good question.

I think the main difference is a matter of scope. Annotations
would apply to a PTransform while an environment applies to
sets of transforms. A difference is the optional nature of
the annotations they don't affect correctness. Runners don't
need to do anything with them and still execute the pipeline
correctly.

Consider a privacy analysis on a pipeline graph. An
annotation indicating that a transform provides a certain
level of anonymization can be used in an analysis to
determine if the downstream transforms are encountering raw
data or not.

From my understanding (which can be wrong) environments are
rigid. Transforms in different environments can't be fused.
"This is the python env", "this is the java env" can't be
merged together. It's not clear to me that we have defined
when environments are safely fuseable outside of equality.
There's value in that simplicity.

AFIACT environment has less to do with the machines a
pipeline is executing on than it does about the kinds of SDK
pipelines it understands and can execute.



On Mon, Nov 16, 2020, 10:36 AM Chad Dombrova
mailto:chad...@gmail.com>> wrote:


Another example of an optional annotation is marking
a transform to run on secure hardware, or to give
hints to profiling/dynamic analysis tools.


There seems to be a lot of overlap between this idea and
Environments.  Can you talk about how you feel they may
be different or related?  For example, I could see
annotations as a way of tagging transforms with an
Environment, or I could see Environments becoming a
specialized form of annotation.

-chad



Re: PTransform Annotations Proposal

2020-11-16 Thread Jan Lukavský

Hi,

could this proposal be generalized to annotations of PCollections as 
well? Maybe that reduces to several types of annotations of a PTransform 
- e.g.


 a) runtime annotations of a PTransform (that might be scheduling hints 
- i.e. schedule this task to nodes with GPUs, etc.)


 b) output annotations - i.e. annotations that actually apply to 
PCollections, as every PCollection has at most one producer (this is 
what have been actually discussed in the referenced mailing list threads)


It would be cool, if this added option to do PTransform expansions based 
on annotations of input PCollections. We tried to play with this in 
Euphoria DSL, but it turned out it would be best fitted in Beam SDK.


Example of input annotation sensitive expansion might be CoGBK, when one 
side is annotated i.e. FitsInMemoryPerWindow (or SmallPerWindow, or 
whatever), then CoGBK might be expanded using broadcast instead of full 
shuffle.


Absolutely agree that all this must not have anything to do with 
semantics and correctness, thus might be safely ignored, and that might 
answer the last question of @Reuven, when there are conflicting 
annotations, it would be possible to simple drop them as a last resort.


Jan

On 11/16/20 8:13 PM, Robert Burke wrote:
I imagine it has everything to do with the specific annotation to 
define that.


The runner notionally doesn't need to do anything with them, as they 
are optional, and not required for correctness.


On Mon, Nov 16, 2020, 10:56 AM Reuven Lax > wrote:


PTransforms are hierarchical - namely a PTransform contains other
PTransforms, and so on. Is the runner expected to resolve all
annotations down to leaf nodes? What happens if that results in
conflicting annotations?

On Mon, Nov 16, 2020 at 10:54 AM Robert Burke mailto:rob...@frantil.com>> wrote:

That's a good question.

I think the main difference is a matter of scope. Annotations
would apply to a PTransform while an environment applies to
sets of transforms. A difference is the optional nature of the
annotations they don't affect correctness. Runners don't need
to do anything with them and still execute the pipeline
correctly.

Consider a privacy analysis on a pipeline graph. An annotation
indicating that a transform provides a certain level of
anonymization can be used in an analysis to determine if the
downstream transforms are encountering raw data or not.

From my understanding (which can be wrong) environments are
rigid. Transforms in different environments can't be fused.
"This is the python env", "this is the java env" can't be
merged together. It's not clear to me that we have defined
when environments are safely fuseable outside of equality.
There's value in that simplicity.

AFIACT environment has less to do with the machines a pipeline
is executing on than it does about the kinds of SDK pipelines
it understands and can execute.



On Mon, Nov 16, 2020, 10:36 AM Chad Dombrova
mailto:chad...@gmail.com>> wrote:


Another example of an optional annotation is marking a
transform to run on secure hardware, or to give hints
to profiling/dynamic analysis tools.


There seems to be a lot of overlap between this idea and
Environments.  Can you talk about how you feel they may be
different or related? For example, I could see annotations
as a way of tagging transforms with an Environment, or I
could see Environments becoming a specialized form of
annotation.

-chad



Re: PTransform Annotations Proposal

2020-11-16 Thread Robert Burke
I imagine it has everything to do with the specific annotation to define
that.

The runner notionally doesn't need to do anything with them, as they are
optional, and not required for correctness.

On Mon, Nov 16, 2020, 10:56 AM Reuven Lax  wrote:

> PTransforms are hierarchical - namely a PTransform contains other
> PTransforms, and so on. Is the runner expected to resolve all annotations
> down to leaf nodes? What happens if that results in conflicting annotations?
>
> On Mon, Nov 16, 2020 at 10:54 AM Robert Burke  wrote:
>
>> That's a good question.
>>
>> I think the main difference is a matter of scope. Annotations would apply
>> to a PTransform while an environment applies to sets of transforms. A
>> difference is the optional nature of the annotations they don't affect
>> correctness. Runners don't need to do anything with them and still execute
>> the pipeline correctly.
>>
>> Consider a privacy analysis on a pipeline graph. An annotation indicating
>> that a transform provides a certain level of anonymization can be used in
>> an analysis to determine if the downstream transforms are encountering raw
>> data or not.
>>
>> From my understanding (which can be wrong) environments are rigid.
>> Transforms in different environments can't be fused. "This is the python
>> env", "this is the java env" can't be merged together. It's not clear to me
>> that we have defined when environments are safely fuseable outside of
>> equality. There's value in that simplicity.
>>
>> AFIACT environment has less to do with the machines a pipeline is
>> executing on than it does about the kinds of SDK pipelines it understands
>> and can execute.
>>
>>
>>
>> On Mon, Nov 16, 2020, 10:36 AM Chad Dombrova  wrote:
>>
>>>
 Another example of an optional annotation is marking a transform to run
 on secure hardware, or to give hints to profiling/dynamic analysis tools.

>>>
>>> There seems to be a lot of overlap between this idea and Environments.
>>> Can you talk about how you feel they may be different or related?  For
>>> example, I could see annotations as a way of tagging transforms with an
>>> Environment, or I could see Environments becoming a specialized form of
>>> annotation.
>>>
>>> -chad
>>>
>>>


Re: PTransform Annotations Proposal

2020-11-16 Thread Reuven Lax
PTransforms are hierarchical - namely a PTransform contains other
PTransforms, and so on. Is the runner expected to resolve all annotations
down to leaf nodes? What happens if that results in conflicting annotations?

On Mon, Nov 16, 2020 at 10:54 AM Robert Burke  wrote:

> That's a good question.
>
> I think the main difference is a matter of scope. Annotations would apply
> to a PTransform while an environment applies to sets of transforms. A
> difference is the optional nature of the annotations they don't affect
> correctness. Runners don't need to do anything with them and still execute
> the pipeline correctly.
>
> Consider a privacy analysis on a pipeline graph. An annotation indicating
> that a transform provides a certain level of anonymization can be used in
> an analysis to determine if the downstream transforms are encountering raw
> data or not.
>
> From my understanding (which can be wrong) environments are rigid.
> Transforms in different environments can't be fused. "This is the python
> env", "this is the java env" can't be merged together. It's not clear to me
> that we have defined when environments are safely fuseable outside of
> equality. There's value in that simplicity.
>
> AFIACT environment has less to do with the machines a pipeline is
> executing on than it does about the kinds of SDK pipelines it understands
> and can execute.
>
>
>
> On Mon, Nov 16, 2020, 10:36 AM Chad Dombrova  wrote:
>
>>
>>> Another example of an optional annotation is marking a transform to run
>>> on secure hardware, or to give hints to profiling/dynamic analysis tools.
>>>
>>
>> There seems to be a lot of overlap between this idea and Environments.
>> Can you talk about how you feel they may be different or related?  For
>> example, I could see annotations as a way of tagging transforms with an
>> Environment, or I could see Environments becoming a specialized form of
>> annotation.
>>
>> -chad
>>
>>


Re: PTransform Annotations Proposal

2020-11-16 Thread Robert Burke
That's a good question.

I think the main difference is a matter of scope. Annotations would apply
to a PTransform while an environment applies to sets of transforms. A
difference is the optional nature of the annotations they don't affect
correctness. Runners don't need to do anything with them and still execute
the pipeline correctly.

Consider a privacy analysis on a pipeline graph. An annotation indicating
that a transform provides a certain level of anonymization can be used in
an analysis to determine if the downstream transforms are encountering raw
data or not.

>From my understanding (which can be wrong) environments are rigid.
Transforms in different environments can't be fused. "This is the python
env", "this is the java env" can't be merged together. It's not clear to me
that we have defined when environments are safely fuseable outside of
equality. There's value in that simplicity.

AFIACT environment has less to do with the machines a pipeline is executing
on than it does about the kinds of SDK pipelines it understands and can
execute.



On Mon, Nov 16, 2020, 10:36 AM Chad Dombrova  wrote:

>
>> Another example of an optional annotation is marking a transform to run
>> on secure hardware, or to give hints to profiling/dynamic analysis tools.
>>
>
> There seems to be a lot of overlap between this idea and Environments.
> Can you talk about how you feel they may be different or related?  For
> example, I could see annotations as a way of tagging transforms with an
> Environment, or I could see Environments becoming a specialized form of
> annotation.
>
> -chad
>
>


Re: PTransform Annotations Proposal

2020-11-16 Thread Chad Dombrova
>
>
> Another example of an optional annotation is marking a transform to run on
> secure hardware, or to give hints to profiling/dynamic analysis tools.
>

There seems to be a lot of overlap between this idea and Environments.  Can
you talk about how you feel they may be different or related?  For example,
I could see annotations as a way of tagging transforms with an Environment,
or I could see Environments becoming a specialized form of annotation.

-chad


Beam Dependency Check Report (2020-11-16)

2020-11-16 Thread Apache Jenkins Server

High Priority Dependency Updates Of Beam Python SDK:


  Dependency Name
  Current Version
  Latest Version
  Release Date Of the Current Used Version
  Release Date Of The Latest Release
  JIRA Issue
  
chromedriver-binary
86.0.4240.22.0
87.0.4280.20.0
2020-09-07
2020-10-19BEAM-10426
dill
0.3.1.1
0.3.3
2019-10-07
2020-11-02BEAM-11167
google-cloud-bigquery
1.28.0
2.3.1
2020-10-05
2020-11-09BEAM-5537
google-cloud-build
2.0.0
3.0.0
2020-11-09
2020-11-09BEAM-11204
google-cloud-datastore
1.15.3
2.0.0
None
2020-11-16BEAM-8443
google-cloud-dlp
1.0.0
2.0.0
2020-06-29
2020-10-05BEAM-10344
google-cloud-language
1.3.0
2.0.0
2020-10-26
2020-10-26BEAM-8
google-cloud-pubsub
1.7.0
2.1.0
2020-07-20
2020-10-05BEAM-5539
google-cloud-spanner
1.19.1
2.0.0
None
2020-11-16BEAM-10345
google-cloud-vision
1.0.0
2.0.0
2020-03-24
2020-10-05BEAM-9581
grpcio-tools
1.30.0
1.33.2
2020-06-29
2020-11-02BEAM-9582
mock
2.0.0
4.0.2
2019-05-20
2020-10-05BEAM-7369
mypy-protobuf
1.18
1.23
2020-03-24
2020-06-29BEAM-10346
nbconvert
5.6.1
6.0.7
2020-10-05
2020-10-05BEAM-11007
Pillow
7.2.0
8.0.1
2020-10-19
2020-10-26BEAM-11071
PyHamcrest
1.10.1
2.0.2
2020-01-20
2020-07-08BEAM-9155
pytest
4.6.11
6.1.2
2020-07-08
2020-11-02BEAM-8606
pytest-xdist
1.34.0
2.1.0
2020-08-17
2020-08-28BEAM-10713
tenacity
5.1.5
6.2.0
2019-11-11
2020-06-29BEAM-8607
High Priority Dependency Updates Of Beam Java SDK:


  Dependency Name
  Current Version
  Latest Version
  Release Date Of the Current Used Version
  Release Date Of The Latest Release
  JIRA Issue
  
com.datastax.cassandra:cassandra-driver-core
3.10.2
4.0.0
2020-08-26
2019-03-18BEAM-8674
com.esotericsoftware:kryo
4.0.2
5.0.0
2018-03-20
2020-10-18BEAM-5809
com.esotericsoftware.kryo:kryo
2.21
2.24.0
2013-02-27
2014-05-04BEAM-5574
com.github.ben-manes.versions:com.github.ben-manes.versions.gradle.plugin
0.33.0
0.36.0
2020-09-14
2020-11-09BEAM-6645
com.github.jk1.dependency-license-report:com.github.jk1.dependency-license-report.gradle.plugin
1.13
1.16
2020-06-29
2020-10-26BEAM-11120
com.google.api.grpc:grpc-google-common-protos
1.18.1
2.0.1
2020-08-11
2020-11-02BEAM-8633
com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1
2.0.2
3.0.3
None
2020-11-16BEAM-8682
com.google.api.grpc:proto-google-common-protos
1.18.1
2.0.1
2020-08-11
2020-11-02BEAM-6899
com.google.apis:google-api-services-bigquery
v2-rev20200916-1.30.10
v2-rev20201030-1.30.10
2020-09-30
2020-11-06BEAM-8684
com.google.apis:google-api-services-clouddebugger
v2-rev20200501-1.30.10
v2-rev20200807-1.30.10
2020-07-14
2020-08-17BEAM-8750
com.google.apis:google-api-services-cloudresourcemanager
v1-rev20200720-1.30.10
v2-rev2020-1.30.10
2020-07-25
2020-11-12BEAM-8751
com.google.apis:google-api-services-dataflow
v1b3-rev20200713-1.30.10
v1beta3-rev12-1.20.0
2020-07-25
2015-04-29BEAM-8752
com.google.apis:google-api-services-healthcare
v1beta1-rev20200713-1.30.10
v1-rev20201104-1.30.10
2020-07-24
2020-11-10BEAM-10349
com.google.apis:google-api-services-pubsub
v1-rev20200713-1.30.10
v1-rev20201101-1.30.10
2020-07-25
2020-11-06BEAM-8753
com.google.apis:google-api-services-storage
v1-rev20200814-1.30.10
v1-rev20201106-1.30.10
2020-09-07
2020-11-11BEAM-8754
com.google.auto.service:auto-service
1.0-rc6
1.0-rc7
2019-07-16
2020-05-13BEAM-5541
com.google.auto.service:auto-service-annotations
1.0-rc6
1.0-rc7
2019-07-16
2020-05-13BEAM-10350
com.google.cloud:google-cloud-dlp
1.1.4
2.2.0
2020-05-04
2020-10-31BEAM-10352

Re: PTransform Annotations Proposal

2020-11-16 Thread Reza Ardeshir Rokni
+1 having a NeedsRam(x) annotation would be incredibly helpful.

On Fri, 13 Nov 2020 at 05:57, Robert Burke  wrote:

> (Disclaimer, Mirac and their team did approach me about this beforehand as
> their interest is in the Go SDK.)
>
> +1 I think it's a good idea. As you've pointed out, there are many
> opportunities for optional pipeline analysis here as well.
>
> A strawman counter point would be to re-used the static DisplayData for
> this kind of thing, but I think that's not necessarily the same thing. It's
> very hard to get something that's purely intended for Human consumption to
> also be suitable for machine consumption, without various adapters and
> such, and it would be an awful hack. Having something specifically for
> Machines to understand is valuable in and of itself.
>
> I appreciate the versatility of simply using known URNs and their defined
> formats, and especially keeping the proposal to optional annotations that
> don't affect correctness. This will work well with most DoFns that need
> specialized hardware. They can usually be emulated on ordinary CPUs, which
> is good for testing, but can perform much better if the hardware is
> available. This also allows the runners to move execution of specific DoFns
> to the machines with the specialized hardware, for better scheduling of
> resources.
>
> I look forward to the PR, and before then, all the discussion the
> community has about this new field in the model proto.
>
>
>
>
>
> On Thu, 12 Nov 2020 at 09:41, Mirac Vuslat Basaran 
> wrote:
>
>> Hi all,
>>
>> We would like to propose adding functionality to add annotations to Beam
>> transforms. These annotations would be readable by the runner, and the
>> runner could then act on this information; for example by doing some
>> special resource allocation. There have been discussions around annotations
>> (or hints as they are sometimes called) in the past (
>> https://lists.apache.org/thread.html/rdf247cfa3a509f80578f03b2454ea1e50474ee3576a059486d58fdf4%40%3Cdev.beam.apache.org%3E,
>>
>> https://lists.apache.org/thread.html/fc090d8acd96c4cf2d23071b5d99f538165d3ff7fbe6f65297655309%40%3Cdev.beam.apache.org%3E).
>> This proposal aims to come up with an accepted lightweight solution with a
>> follow-up Pull Request to implement it in Go.
>>
>> By annotations, we refer to optional information / hints provided to the
>> runner. This proposal explicitly excludes “required” annotations that could
>> cause incorrect output. A runner that does not understand the annotations
>> and ignores them must still produce correct output, with perhaps a
>> degradation in performance or other nonfunctional requirements. Supporting
>> only “optional” annotations allows for compatibility with runners that do
>> not recognize those annotations.
>>
>> A good example of an optional annotation is marking a transform to be run
>> on GPU or TPU or that it needs a certain amount of RAM. If the runner knows
>> about this annotation, it can then allocate the requested resources for
>> that transform only to improve performance and avoid using these scarce
>> resources for other transforms.
>>
>> Another example of an optional annotation is marking a transform to run
>> on secure hardware, or to give hints to profiling/dynamic analysis tools.
>>
>> In all these cases, the runner can run the pipeline with or without the
>> annotation, and in both cases the same output would be produced. There
>> would be differences in nonfunctional requirements (performance, security,
>> ease of profiling), hence the optional part.
>>
>> A counter-example that this proposal explicitly excludes is marking a
>> transform as requiring sorted input. For example, on a transform that
>> expects time-sorted input in order to produce the correct output. If the
>> runner ignores this requirement, it would risk producing an incorrect
>> output. In order to avoid this, we exclude these required annotations.
>>
>> Implementation-wise, we propose to add a field:
>>  - map annotations = 8;
>> to PTransform proto (
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L127).
>> The key would be a URN that uniquely identifies the type of annotation. The
>> value is an opaque byte array (e.g., a serialized protocol buffer) to allow
>> for maximum flexibility to the implementation of that specific type of
>> annotation.
>>
>> We have a specific interest in adding this to the Go SDK. In Go, the user
>> would specify the annotations to a structural ParDo as follows, by defining
>> a field:
>>  - Annotations map[string][]byte
>> and filling it out. For simplicity, we will only support structural doFns
>> in Go for the time being.
>>
>> The runners could then read the annotations from the PTransform proto and
>> support the annotations that they would like to in the way they want.
>>
>> Please let me know what you think, and what would be the best way to
>> proceed, e.g., we can share a small design doc or, in case 

Re: Question about saving data to use across runner's instances

2020-11-16 Thread Reza Ardeshir Rokni
Hi,

Do you have an upper bound on how large the file will become?  If
it's small enough to fit into a sideinput you may be able to make use of
the Slow update sideinput pattern:
https://beam.apache.org/documentation/patterns/side-inputs/

If not, then SatefulDoFn would be a good choice, but note a stateful dofn
is per key/window. Is there a natural key in the data that you can use ? If
yes, something like this pattern may be useful for you use case:
streaming-joins-in-a-recommendation-system

.

In terms of persisting the file, you may want to create a branch in the
pipeline and every time you update the file data, write the file out to an
object store, which you can read from if the pipeline needs to be restarted
or crashes.

Cheers
Reza

On Mon, 16 Nov 2020 at 04:48, Artur Khanin  wrote:

> Hi all,
>
> I am designing a Dataflow pipeline in Java that has to:
>
>- Read a file (it may be pretty large) during initialization and then
>store it in some sort of shared memory
>- Periodically update this file
>- Make this file available to read across all runner's instances
>- Persist this file in cases of restarts/crashes/scale-up/scale down
>
>
> I found some information about stateful processing in Beam using Stateful
> DoFn . Is it an
> appropriate way to handle such functionality, or is there a better approach
> for it?
>
> Any help or information is very appreciated!
>
> Thanks,
> Artur Khanin
> Akvelon, Inc.
>
>