[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=89141=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89141
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 09/Apr/18 21:31
Start Date: 09/Apr/18 21:31
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-379900371
 
 
   This broke the BigtableWriteIT because you can't convert a Joda DateTime to 
a Java Date for formatting. Fixed in: https://github.com/apache/beam/pull/5066


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 89141)
Time Spent: 9h  (was: 8h 50m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=88218=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88218
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 05/Apr/18 19:55
Start Date: 05/Apr/18 19:55
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-379057477
 
 
   Yea when green LGTM for merge from me. Incidentally it might be a good idea 
to move work from said branch to master just to get finer grained review. This 
PR probably had somewhere between 2 and 5 PRs worth of orthogonal changes in it 
(like switching things to joda time, moves, renames, and also the new 
functionality).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88218)
Time Spent: 8h 40m  (was: 8.5h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=88217=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88217
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 05/Apr/18 19:51
Start Date: 05/Apr/18 19:51
Worklog Time Spent: 10m 
  Work Description: akedin commented on issue #4964: [BEAM-3437] Introduce 
Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-379056365
 
 
   Yes, lgtm from me, merge when ready


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88217)
Time Spent: 8.5h  (was: 8h 20m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=88216=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88216
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 05/Apr/18 19:47
Start Date: 05/Apr/18 19:47
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-379055305
 
 
   FYI that is exactly the point of this work. However the work to make
   schemas work on non-Row objects is still in the branch I have with JB. We
   decided it made sense to merge just this part of it so that we don't create
   huge conflicts with BeamSQL work.
   
   On Thu, Apr 5, 2018 at 12:43 PM Kenn Knowles 
   wrote:
   
   > OK - if it doesn't exist in the portable model and is just a feature of
   > the Java SDK then that is a solid avenue for exploration. For SQL we
   > actually want perhaps a lot more than this, as we'd like embedded SQL
   > transforms to be able to input and output non-Row types generically based
   > on schema.
   >
   > —
   > You are receiving this because you authored the thread.
   > Reply to this email directly, view it on GitHub
   > , or mute
   > the thread
   > 

   > .
   >
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88216)
Time Spent: 8h 20m  (was: 8h 10m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=88206=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88206
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 05/Apr/18 19:41
Start Date: 05/Apr/18 19:41
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-379053692
 
 
   OK - if it doesn't exist in the portable model and is just a feature of the 
Java SDK then that is a solid avenue for exploration. For SQL we actually want 
perhaps a lot more than this, as we'd like embedded SQL transforms to be able 
to input and output non-Row types generically based on schema.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88206)
Time Spent: 8h 10m  (was: 8h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=88175=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88175
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 05/Apr/18 18:07
Start Date: 05/Apr/18 18:07
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-379026543
 
 
   On Thu, Apr 5, 2018 at 11:02 AM Kenn Knowles 
   wrote:
   
   > It seems like this is a good idea that needs lots of baking. That will
   > work best once it is in. How about we build a document with notes on
   > follow-ups or an umbrella JIRA with subtasks? Otherwise I'm concerned the
   > collection of things we want to look into more specifically may get lost.
   >
   > Being totally frank, the code seems fine while the fundamentals of what a
   > schema is are where I still have the most questions, especially as pertains
   > to portability. At the portability layer, encodings (coders) and types are
   > synonymous. In a particular language, there is the language's types that
   > come from coders. Then each SQL dialect has its own notion of standard
   > types that need not correspond to any general purpose language's. And of
   > course Avro and Proto have their own encoding-to-language mappings to
   > contend with. I really don't think Beam should add another.
   >
   
   FYI, the simple answer is that at the portability layer the only type is
   Row - individual schema fields don't exist as separate types at the
   portability layer. And in truth the fact that we currently "encode" fields
   using coders is a potentially temporary implementation detail.
   
   > So I want to get this in as experimental but continue work before we have
   > lots of dependencies on schemas. So [image: :lgtm:]
   > 

   > pending a followup document or JIRA.
   > --
   >
   > Reviewed 79 of 150 files at r1, 4 of 20 files at r4, 11 of 29 files at r5,
   > 1 of 11 files at r6, 20 of 32 files at r7, 1 of 1 files at r8, 32 of 33
   > files at r9.
   > Review status: all files reviewed at latest revision, 3 unresolved
   > discussions.
   > --
   >
   > *sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java,
   > line 39 at r9
   > 

   > (raw file
   > 
):*
   >
   > @Experimentalpublic class RowCoder extends CustomCoder {
   >   private static final Map CODER_MAP = 
ImmutableMap.builder()
   >
   > These should probably be defaults, not hardcoded.
   > --
   >
   > *sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java,
   > line 52 at r9
   > 

   > (raw file
   > 
):*
   >
   >   .build();
   >
   >   private static final Map ESTIMATED_FIELD_SIZES =
   >
   > Units in the name - at usage sites it will not be clear what they are.
   > --
   >
   > *sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java,
   > line 79 at r9
   > 

   > (raw file
   > 
):*
   >
   >* Return the estimated serialized size of a give row object.
   >*/
   >   public static long estimatedSizeBytes(Row row) {
   >
   > And given the particular field coders being per-instance, this would be a
   > non-static method, etc.
   > --
   >
   > *sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java, line 296
   > at r9
   > 

   > (raw file
   > 
):*
   >
   >  * PCollection transactions = ...;
   >  * transactions.apply(FileIO.

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=88174=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88174
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 05/Apr/18 18:02
Start Date: 05/Apr/18 18:02
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-379024918
 
 
   It seems like this is a good idea that needs lots of baking. That will work 
best once it is in. How about we build a document with notes on follow-ups or 
an umbrella JIRA with subtasks? Otherwise I'm concerned the collection of 
things we want to look into more specifically may get lost.
   
   Being totally frank, the code seems fine while the fundamentals of what a 
schema is are where I still have the most questions, especially as pertains to 
portability. At the portability layer, encodings (coders) and types are 
synonymous. In a particular language, there is the language's types that come 
from coders. Then each SQL dialect has its own notion of standard types that 
need not correspond to any general purpose language's. And of course Avro and 
Proto have their own encoding-to-language mappings to contend with. I really 
don't think Beam should add another.
   
   So I want to get this in as experimental but continue work before we have 
lots of dependencies on schemas. So https://reviewable.io/lgtm.png; height="20" 
width="61"/> pending a followup document or JIRA.
   
   ---
   
   Reviewed 79 of 150 files at r1, 4 of 20 files at r4, 11 of 29 files at r5, 1 
of 11 files at r6, 20 of 32 files at r7, 1 of 1 files at r8, 32 of 33 files at 
r9.
   Review status: all files reviewed at latest revision, 3 unresolved 
discussions.
   
   ---
   
   *[sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java, 
line 39 at 
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LipYfDPrPNLA5iboh:-L9LipYfDPrPNLA5iboi:b-kcvp9c)
 ([raw 
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java#L39)):*
   > ```Java
   > @Experimental
   > public class RowCoder extends CustomCoder {
   >   private static final Map CODER_MAP = 
ImmutableMap.builder()
   > ```
   
   These should probably be defaults, not hardcoded.
   
   ---
   
   *[sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java, 
line 52 at 
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LiaqH42LqLGYe_yRb:-L9LiaqH42LqLGYe_yRc:bz5ztdg)
 ([raw 
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java#L52)):*
   > ```Java
   >   .build();
   > 
   >   private static final Map ESTIMATED_FIELD_SIZES =
   > ```
   
   Units in the name - at usage sites it will not be clear what they are.
   
   ---
   
   *[sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java, 
line 79 at 
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9Lit5n4nmMBm0PsKNI:-L9Lit5n4nmMBm0PsKNJ:bft3uyf)
 ([raw 
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java#L79)):*
   > ```Java
   >* Return the estimated serialized size of a give row object.
   >*/
   >   public static long estimatedSizeBytes(Row row) {
   > ```
   
   And given the particular field coders being per-instance, this would be a 
non-static method, etc.
   
   ---
   
   *[sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java, line 296 
at 
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LiPXj0UF0Su-q34K4:-L9LiPXj0UF0Su-q34K5:b-kxeoth)
 ([raw 
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L296)):*
   > ```Java
   >  * PCollection transactions = ...;
   >  * transactions.apply(FileIO.writeDynamic()
   >  * .by(Transaction::getTypeName)
   > ```
   
   There are a bunch of tiny renames that I don't really get.
   
   ---
   
   *[sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java, line 
18 at 
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9H0EwC1Ny6SIRPGLDb:-L9H0EwC1Ny6SIRPGLDc:b-8ioiqz)
 ([raw 
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L18)):*
   > ```Java
   >  * limitations under the License.
   >  */
   > package org.apache.beam.sdk.schemas;
   > ```
   
   Just to keep things simple - is it useful to have a package boundary here? I 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=88169=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88169
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 05/Apr/18 17:54
Start Date: 05/Apr/18 17:54
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-379022343
 
 
   @akedin TypeName and FieldType are in Schema.java. If you think we should 
merge, is that a LGTM for now?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88169)
Time Spent: 7h 40m  (was: 7.5h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-04 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87714=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87714
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 04/Apr/18 18:30
Start Date: 04/Apr/18 18:30
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-378700078
 
 
   @akedin addressed remaining comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87714)
Time Spent: 7.5h  (was: 7h 20m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-04 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87701=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87701
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 04/Apr/18 18:17
Start Date: 04/Apr/18 18:17
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r179236211
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
 ##
 @@ -163,13 +164,13 @@ private static Table mockTable(String name, String type, 
String comment, JSONObj
 .columns(ImmutableList.of(
 Column.builder()
 .name("id")
-.coder(INTEGER)
+.typeDescriptor(TypeName.INT32.type())
 .primaryKey(false)
 .comment("id")
 .build(),
 Column.builder()
 .name("name")
-.coder(VARCHAR)
+.typeDescriptor(CalciteUtils.toFieldType(SqlTypeName.VARCHAR))
 
 Review comment:
   
   
   > **akedin** wrote:
   > I don't think that CalciteUtils should be used here, even if it's 
temporary. I'd rather have our own `SqlType.VARCHAR = 
TypeName.STRING.withMetadata("VARCHAR")`.
   
   
   Done. Added it to RowSqlTypes (so we can later remove the builder class 
there, but leave the defined types.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87701)
Time Spent: 7h 20m  (was: 7h 10m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-04 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87700=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87700
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 04/Apr/18 18:17
Start Date: 04/Apr/18 18:17
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r179236209
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java
 ##
 @@ -44,7 +46,7 @@ public static Builder builder() {
   @AutoValue.Builder
   public abstract static class Builder {
 public abstract Builder name(String name);
-public abstract Builder coder(Coder coder);
+public abstract Builder typeDescriptor(FieldType fieldType);
 
 Review comment:
   
   
   > **akedin** wrote:
   > `type(FieldType)`
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87700)
Time Spent: 7h 10m  (was: 7h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-04 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87702=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87702
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 04/Apr/18 18:17
Start Date: 04/Apr/18 18:17
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r179236212
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianShortCoder.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.coders;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link BigEndianShortCoder} encodes {@link Short Shorts} in 4 bytes, 
big-endian.
+ */
+public class BigEndianShortCoder extends AtomicCoder {
+
+  public static BigEndianShortCoder of() {
+return INSTANCE;
+  }
+
+  /
+
+  private static final BigEndianShortCoder INSTANCE = new 
BigEndianShortCoder();
+  private static final TypeDescriptor TYPE_DESCRIPTOR = new 
TypeDescriptor() {};
+
+  private BigEndianShortCoder() {}
+
+  @Override
+  public void encode(Short value, OutputStream outStream) throws IOException {
+if (value == null) {
+  throw new CoderException("cannot encode a null Short");
+}
+new DataOutputStream(outStream).writeShort(value);
+  }
+
+  @Override
+  public Short decode(InputStream inStream)
+  throws IOException, CoderException {
+try {
+  return new DataInputStream(inStream).readShort();
+} catch (EOFException | UTFDataFormatException exn) {
+  // These exceptions correspond to decoding problems, so change
+  // what kind of exception they're branded as.
+  throw new CoderException(exn);
+}
+  }
+
+  @Override
+  public void verifyDeterministic() {}
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return {@code true}. This coder is injective.
+   */
+  @Override
+  public boolean consistentWithEquals() {
+return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return {@code true}, because {@link #getEncodedElementByteSize} runs in 
constant time.
+   */
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(Short value) {
+return true;
+  }
+
+  @Override
+  public TypeDescriptor getEncodedTypeDescriptor() {
+return TYPE_DESCRIPTOR;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return {@code 4}, the size in bytes of an short's big endian encoding.
 
 Review comment:
   
   
   > **akedin** wrote:
   > `{@code 2}`
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87702)
Time Spent: 7h 20m  (was: 7h 10m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87426=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87426
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 04/Apr/18 04:59
Start Date: 04/Apr/18 04:59
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r179025224
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java
 ##
 @@ -44,7 +46,7 @@ public static Builder builder() {
   @AutoValue.Builder
   public abstract static class Builder {
 public abstract Builder name(String name);
-public abstract Builder coder(Coder coder);
+public abstract Builder typeDescriptor(FieldType fieldType);
 
 Review comment:
   `type(FieldType)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87426)
Time Spent: 7h  (was: 6h 50m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87424=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87424
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 04/Apr/18 04:59
Start Date: 04/Apr/18 04:59
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r179022576
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianShortCoder.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.coders;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link BigEndianShortCoder} encodes {@link Short Shorts} in 4 bytes, 
big-endian.
+ */
+public class BigEndianShortCoder extends AtomicCoder {
+
+  public static BigEndianShortCoder of() {
+return INSTANCE;
+  }
+
+  /
+
+  private static final BigEndianShortCoder INSTANCE = new 
BigEndianShortCoder();
+  private static final TypeDescriptor TYPE_DESCRIPTOR = new 
TypeDescriptor() {};
+
+  private BigEndianShortCoder() {}
+
+  @Override
+  public void encode(Short value, OutputStream outStream) throws IOException {
+if (value == null) {
+  throw new CoderException("cannot encode a null Short");
+}
+new DataOutputStream(outStream).writeShort(value);
+  }
+
+  @Override
+  public Short decode(InputStream inStream)
+  throws IOException, CoderException {
+try {
+  return new DataInputStream(inStream).readShort();
+} catch (EOFException | UTFDataFormatException exn) {
+  // These exceptions correspond to decoding problems, so change
+  // what kind of exception they're branded as.
+  throw new CoderException(exn);
+}
+  }
+
+  @Override
+  public void verifyDeterministic() {}
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return {@code true}. This coder is injective.
+   */
+  @Override
+  public boolean consistentWithEquals() {
+return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return {@code true}, because {@link #getEncodedElementByteSize} runs in 
constant time.
+   */
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(Short value) {
+return true;
+  }
+
+  @Override
+  public TypeDescriptor getEncodedTypeDescriptor() {
+return TYPE_DESCRIPTOR;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return {@code 4}, the size in bytes of an short's big endian encoding.
 
 Review comment:
   `{@code 2}`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87424)
Time Spent: 6h 50m  (was: 6h 40m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87425=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87425
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 04/Apr/18 04:59
Start Date: 04/Apr/18 04:59
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r179026015
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
 ##
 @@ -163,13 +164,13 @@ private static Table mockTable(String name, String type, 
String comment, JSONObj
 .columns(ImmutableList.of(
 Column.builder()
 .name("id")
-.coder(INTEGER)
+.typeDescriptor(TypeName.INT32.type())
 .primaryKey(false)
 .comment("id")
 .build(),
 Column.builder()
 .name("name")
-.coder(VARCHAR)
+.typeDescriptor(CalciteUtils.toFieldType(SqlTypeName.VARCHAR))
 
 Review comment:
   I don't think that CalciteUtils should be used here, even if it's temporary. 
I'd rather have our own `SqlType.VARCHAR = 
TypeName.STRING.withMetadata("VARCHAR")`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87425)
Time Spent: 6h 50m  (was: 6h 40m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87304=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87304
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 22:03
Start Date: 03/Apr/18 22:03
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178975398
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
 ##
 @@ -281,7 +281,7 @@ public void testIsNullAndIsNotNull() throws Exception {
   }
 
   @Override protected PCollection getTestPCollection() {
-RowType type = RowSqlType.builder()
+Schema type = RowSqlType.builder()
 
 Review comment:
   ack


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87304)
Time Spent: 6h 40m  (was: 6.5h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87302=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87302
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 22:02
Start Date: 03/Apr/18 22:02
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178975243
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianShortCoder.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.coders;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link BigEndianShortCoder} encodes {@link Short Shorts} in 4 bytes, 
big-endian.
+ */
+public class BigEndianShortCoder extends AtomicCoder {
+
+  public static BigEndianShortCoder of() {
+return INSTANCE;
+  }
+
+  /
+
+  private static final BigEndianShortCoder INSTANCE = new 
BigEndianShortCoder();
+  private static final TypeDescriptor TYPE_DESCRIPTOR = new 
TypeDescriptor() {};
+
+  private BigEndianShortCoder() {}
+
+  @Override
+  public void encode(Short value, OutputStream outStream) throws IOException {
+if (value == null) {
+  throw new CoderException("cannot encode a null Short");
+}
+new DataOutputStream(outStream).writeShort(value);
 
 Review comment:
   Ack


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87302)
Time Spent: 6h 20m  (was: 6h 10m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87301=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87301
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 22:02
Start Date: 03/Apr/18 22:02
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178975184
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
 ##
 @@ -80,11 +81,11 @@ public void testSelectArrayValue() {
   public void testProjectArrayField() {
 PCollection input = pCollectionOf2Elements();
 
-RowType resultType =
+Schema resultType =
 RowSqlType
 .builder()
 .withIntegerField("f_int")
-.withArrayField("f_stringArr", SqlTypeCoders.VARCHAR)
+.withArrayField("f_stringArr", SqlTypeName.VARCHAR)
 
 Review comment:
   This builder will be removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87301)
Time Spent: 6h 10m  (was: 6h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87303=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87303
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 22:02
Start Date: 03/Apr/18 22:02
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178975317
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
 ##
 @@ -75,191 +83,230 @@ public static Row nullRow(RowType rowType) {
* if type doesn't match.
*/
   public  T getValue(String fieldName) {
-return getValue(getRowType().indexOf(fieldName));
+return getValue(getSchema().indexOf(fieldName));
   }
 
   /**
* Get value by field index, {@link ClassCastException} is thrown
-   * if type doesn't match.
+   * if schema doesn't match.
*/
   @Nullable
   public  T getValue(int fieldIdx) {
 return (T) getValues().get(fieldIdx);
   }
 
   /**
-   * Get a {@link Byte} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#BYTE} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Byte getByte(String fieldName) {
-return getValue(fieldName);
+  public byte getByte(String fieldName) {
+return getByte(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Short} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#INT16} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Short getShort(String fieldName) {
-return getValue(fieldName);
+  public short getInt16(String fieldName) {
+return getInt16(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Integer} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#INT32} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Integer getInteger(String fieldName) {
-return getValue(fieldName);
+  public int getInt32(String fieldName) {
+return getInt32(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Float} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#INT64} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Float getFloat(String fieldName) {
-return getValue(fieldName);
+  public long getInt64(String fieldName) {
+return getInt64(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Double} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#DECIMAL} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Double getDouble(String fieldName) {
-return getValue(fieldName);
+  public BigDecimal getDecimal(String fieldName) {
+return getDecimal(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Long} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#FLOAT} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Long getLong(String fieldName) {
-return getValue(fieldName);
+  public float getFloat(String fieldName) {
+return getFloat(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link String} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#DOUBLE} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
+   */
+  public double getDouble(String fieldName) {
+return getDouble(getSchema().indexOf(fieldName));
+  }
+
+  /**
+   * Get a {@link TypeName#STRING} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
   public String getString(String fieldName) {
-return getValue(fieldName);
+return getString(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Date} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#DATETIME} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Date getDate(String fieldName) {
-return getValue(fieldName);
+  public ReadableDateTime getDateTime(String fieldName) {
+return getDateTime(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87300=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87300
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 22:01
Start Date: 03/Apr/18 22:01
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178974984
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
 ##
 @@ -80,11 +81,11 @@ public void testSelectArrayValue() {
   public void testProjectArrayField() {
 PCollection input = pCollectionOf2Elements();
 
-RowType resultType =
+Schema resultType =
 RowSqlType
 .builder()
 .withIntegerField("f_int")
-.withArrayField("f_stringArr", SqlTypeCoders.VARCHAR)
+.withArrayField("f_stringArr", SqlTypeName.VARCHAR)
 
 Review comment:
   I'll remove RowSqlTypeBuilder in a follow-in PR


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87300)
Time Spent: 6h  (was: 5h 50m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87168=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87168
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 17:15
Start Date: 03/Apr/18 17:15
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178898184
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
+public boolean isStringType() {
+  return STRING_TYPES.contains(this);
+}
+public boolean isDateType() {
+  return DATE_TYPES.contains(this);
+}
+public boolean isContainerType() {
+  return CONTAINER_TYPES.contains(this);
+}
+public boolean isCompositeType() {
+  return COMPOSITE_TYPES.contains(this);
+}
+
+/** Returns a {@link FieldTypeDescriptor} representing this primitive 
type. */
+public FieldTypeDescriptor typeDescriptor() {
+  return FieldTypeDescriptor.of(this);
+}
+  }
+
+  /**
+   * A descriptor of a single field type. This is a recursive descriptor, as 
nested types are
+   * allowed.
+   */
+  @AutoValue
+  public abstract static class FieldTypeDescriptor implements Serializable {
+// Returns the type of 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87167=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87167
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 17:14
Start Date: 03/Apr/18 17:14
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178897881
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
+public boolean isStringType() {
+  return STRING_TYPES.contains(this);
+}
+public boolean isDateType() {
+  return DATE_TYPES.contains(this);
+}
+public boolean isContainerType() {
+  return CONTAINER_TYPES.contains(this);
+}
+public boolean isCompositeType() {
+  return COMPOSITE_TYPES.contains(this);
+}
+
+/** Returns a {@link FieldTypeDescriptor} representing this primitive 
type. */
+public FieldTypeDescriptor typeDescriptor() {
+  return FieldTypeDescriptor.of(this);
+}
+  }
+
+  /**
+   * A descriptor of a single field type. This is a recursive descriptor, as 
nested types are
+   * allowed.
+   */
+  @AutoValue
+  public abstract static class FieldTypeDescriptor implements Serializable {
+// Returns the type of 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87166=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87166
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 17:13
Start Date: 03/Apr/18 17:13
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178897537
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
+public boolean isStringType() {
+  return STRING_TYPES.contains(this);
+}
+public boolean isDateType() {
+  return DATE_TYPES.contains(this);
+}
+public boolean isContainerType() {
+  return CONTAINER_TYPES.contains(this);
+}
+public boolean isCompositeType() {
+  return COMPOSITE_TYPES.contains(this);
+}
+
+/** Returns a {@link FieldTypeDescriptor} representing this primitive 
type. */
+public FieldTypeDescriptor typeDescriptor() {
+  return FieldTypeDescriptor.of(this);
+}
+  }
+
+  /**
+   * A descriptor of a single field type. This is a recursive descriptor, as 
nested types are
+   * allowed.
+   */
+  @AutoValue
+  public abstract static class FieldTypeDescriptor implements Serializable {
 
 Review comment:
   Done


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87164=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87164
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 17:10
Start Date: 03/Apr/18 17:10
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178896709
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
 ##
 @@ -153,6 +156,14 @@ public boolean accept() {
 }
   }
 
+  //
+  private static  T convertValue(T value, SqlTypeName typeName) {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87164)
Time Spent: 5h 20m  (was: 5h 10m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87163=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87163
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 17:08
Start Date: 03/Apr/18 17:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178896184
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
 ##
 @@ -17,69 +17,146 @@
  */
 package org.apache.beam.sdk.coders;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema.FieldTypeDescriptor;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.RowType;
 
 /**
  *  A {@link Coder} for {@link Row}. It wraps the {@link Coder} for each 
element directly.
  */
 @Experimental
 public class RowCoder extends CustomCoder {
+  private static final Map CODER_MAP = 
ImmutableMap.builder()
+  .put(TypeName.BYTE, ByteCoder.of())
+  .put(TypeName.INT16, BigEndianShortCoder.of())
+  .put(TypeName.INT32, BigEndianIntegerCoder.of())
+  .put(TypeName.INT64, BigEndianLongCoder.of())
+  .put(TypeName.DECIMAL, BigDecimalCoder.of())
+  .put(TypeName.FLOAT, FloatCoder.of())
+  .put(TypeName.DOUBLE, DoubleCoder.of())
+  .put(TypeName.STRING, StringUtf8Coder.of())
+  .put(TypeName.DATETIME, InstantCoder.of())
+  .put(TypeName.BOOLEAN, BooleanCoder.of())
+  .build();
+
+  private static final Map ESTIMATED_FIELD_SIZES =
+  ImmutableMap.builder()
+  .put(TypeName.BYTE, Byte.BYTES)
+  .put(TypeName.INT16, Short.BYTES)
+  .put(TypeName.INT32, Integer.BYTES)
+  .put(TypeName.INT64, Long.BYTES)
+  .put(TypeName.FLOAT, Float.BYTES)
+  .put(TypeName.DOUBLE, Double.BYTES)
+  .put(TypeName.DECIMAL, 32)
+  .put(TypeName.BOOLEAN, 1)
+  .put(TypeName.DATETIME, Long.BYTES)
+  .build();
+
   private static final BitSetCoder nullListCoder = BitSetCoder.of();
 
-  private RowType rowType;
-  private List coders;
+  private Schema schema;
+
+  /**
+   * Returns the coder used for a given primitive type.
+   */
+  public static  Coder coderForPrimitiveType(TypeName typeName) {
+return (Coder) CODER_MAP.get(typeName);
+  }
+
+  /**
+   * Return the estimated serialized size of a give row object.
+   */
+  public static long estimatedSizeBytes(Row row) {
 
 Review comment:
   I think it is associated on RowCoder, because it is specific to how a Row is 
encoded by this coder. Logically the Row object knows nothing about its coder. 
In fact we might have multiple Coders for Row, and each one might have a 
different estimated size.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87163)
Time Spent: 5h 10m  (was: 5h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86865=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86865
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178678007
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
 
 Review comment:
   Why not sets?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86865)
Time Spent: 4h 40m  (was: 4.5h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86860=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86860
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178678158
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
+public boolean isStringType() {
+  return STRING_TYPES.contains(this);
+}
+public boolean isDateType() {
+  return DATE_TYPES.contains(this);
+}
+public boolean isContainerType() {
+  return CONTAINER_TYPES.contains(this);
+}
+public boolean isCompositeType() {
+  return COMPOSITE_TYPES.contains(this);
+}
+
+/** Returns a {@link FieldTypeDescriptor} representing this primitive 
type. */
+public FieldTypeDescriptor typeDescriptor() {
+  return FieldTypeDescriptor.of(this);
 
 Review comment:
   Should we cache these? They're immutable, so it probably makes sense to keep 
an instance in the field in the `TypeName`


This is an automated message from the Apache Git 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86856=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86856
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178651934
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
 ##
 @@ -153,6 +156,14 @@ public boolean accept() {
 }
   }
 
+  //
+  private static  T convertValue(T value, SqlTypeName typeName) {
 
 Review comment:
   I don't think this method is needed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86856)
Time Spent: 3.5h  (was: 3h 20m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86862=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86862
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178679735
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
+public boolean isStringType() {
+  return STRING_TYPES.contains(this);
+}
+public boolean isDateType() {
+  return DATE_TYPES.contains(this);
+}
+public boolean isContainerType() {
+  return CONTAINER_TYPES.contains(this);
+}
+public boolean isCompositeType() {
+  return COMPOSITE_TYPES.contains(this);
+}
+
+/** Returns a {@link FieldTypeDescriptor} representing this primitive 
type. */
+public FieldTypeDescriptor typeDescriptor() {
+  return FieldTypeDescriptor.of(this);
+}
+  }
+
+  /**
+   * A descriptor of a single field type. This is a recursive descriptor, as 
nested types are
+   * allowed.
+   */
+  @AutoValue
+  public abstract static class FieldTypeDescriptor implements Serializable {
+// Returns the type of this 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86863=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86863
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178676886
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
 ##
 @@ -17,69 +17,146 @@
  */
 package org.apache.beam.sdk.coders;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldTypeDescriptor;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.RowType;
 
 /**
  *  A {@link Coder} for {@link Row}. It wraps the {@link Coder} for each 
element directly.
  */
 @Experimental
 public class RowCoder extends CustomCoder {
+  private static final Map CODER_MAP = 
ImmutableMap.builder()
+  .put(TypeName.BYTE, ByteCoder.of())
+  .put(TypeName.INT16, BigEndianShortCoder.of())
+  .put(TypeName.INT32, BigEndianIntegerCoder.of())
+  .put(TypeName.INT64, BigEndianLongCoder.of())
+  .put(TypeName.DECIMAL, BigDecimalCoder.of())
+  .put(TypeName.FLOAT, FloatCoder.of())
+  .put(TypeName.DOUBLE, DoubleCoder.of())
+  .put(TypeName.STRING, StringUtf8Coder.of())
+  .put(TypeName.DATETIME, InstantCoder.of())
+  .put(TypeName.BOOLEAN, BooleanCoder.of())
+  .build();
+
+  private static final Map ESTIMATED_FIELD_SIZES =
+  ImmutableMap.builder()
+  .put(TypeName.BYTE, Byte.BYTES)
+  .put(TypeName.INT16, Short.BYTES)
+  .put(TypeName.INT32, Integer.BYTES)
+  .put(TypeName.INT64, Long.BYTES)
+  .put(TypeName.FLOAT, Float.BYTES)
+  .put(TypeName.DOUBLE, Double.BYTES)
+  .put(TypeName.DECIMAL, 32)
+  .put(TypeName.BOOLEAN, 1)
+  .put(TypeName.DATETIME, Long.BYTES)
+  .build();
+
   private static final BitSetCoder nullListCoder = BitSetCoder.of();
 
-  private RowType rowType;
-  private List coders;
+  private Schema schema;
+
+  /**
+   * Returns the coder used for a given primitive type.
+   */
+  public static  Coder coderForPrimitiveType(TypeName typeName) {
+return (Coder) CODER_MAP.get(typeName);
+  }
+
+  /**
+   * Return the estimated serialized size of a give row object.
+   */
+  public static long estimatedSizeBytes(Row row) {
+Schema schema = row.getSchema();
+int fieldCount = schema.getFieldCount();
+int bitmapSize = (((fieldCount - 1) >> 6) + 1) * 8;
 
-  private RowCoder(RowType rowType, List coders) {
-this.rowType = rowType;
-this.coders = coders;
+int fieldsSize = 0;
+for (int i = 0; i < schema.getFieldCount(); ++i) {
+  fieldsSize += estimatedSizeBytes(schema.getField(i).getTypeDescriptor(), 
row.getValue(i));
+}
+return bitmapSize + fieldsSize;
   }
 
-  public static RowCoder of(RowType rowType, List coderArray) {
-if (rowType.getFieldCount() != coderArray.size()) {
-  throw new IllegalArgumentException("Coder size doesn't match with field 
size");
+  private static long estimatedSizeBytes(FieldTypeDescriptor typeDescriptor, 
Object value) {
+switch (typeDescriptor.getType()) {
+  case ROW:
+return estimatedSizeBytes((Row) value);
+  case ARRAY:
+List list = (List) value;
+long listSizeBytes = 0;
+for (Object elem : list) {
+  listSizeBytes += 
estimatedSizeBytes(typeDescriptor.getComponentType(), elem);
+}
+return 4 + listSizeBytes;
+  case STRING:
+// Not always accurate - String.getBytes().length() would be more 
accurate here, but slower.
+return ((String) value).length();
+  default:
+return ESTIMATED_FIELD_SIZES.get(typeDescriptor.getType());
 }
-return new RowCoder(rowType, coderArray);
   }
 
-  public RowType getRowType() {
-return rowType;
+  private RowCoder(Schema schema) {
+this.schema = schema;
+  }
+
+  public static RowCoder of(Schema schema) {
+return new RowCoder(schema);
+  }
+
+  public Schema getSchema() {
+return schema;
+  }
+
+  Coder getCoder(FieldTypeDescriptor fieldTypeDescriptor) {
+if (TypeName.ARRAY.equals(fieldTypeDescriptor.getType())) {
+   

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86868=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86868
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178678193
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
+public boolean isStringType() {
+  return STRING_TYPES.contains(this);
+}
+public boolean isDateType() {
+  return DATE_TYPES.contains(this);
+}
+public boolean isContainerType() {
+  return CONTAINER_TYPES.contains(this);
+}
+public boolean isCompositeType() {
+  return COMPOSITE_TYPES.contains(this);
+}
+
+/** Returns a {@link FieldTypeDescriptor} representing this primitive 
type. */
+public FieldTypeDescriptor typeDescriptor() {
+  return FieldTypeDescriptor.of(this);
+}
+  }
+
+  /**
+   * A descriptor of a single field type. This is a recursive descriptor, as 
nested types are
+   * allowed.
+   */
+  @AutoValue
+  public abstract static class FieldTypeDescriptor implements Serializable {
+// Returns the type of this 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86855=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86855
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178629710
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
 ##
 @@ -17,69 +17,146 @@
  */
 package org.apache.beam.sdk.coders;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema.FieldTypeDescriptor;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.RowType;
 
 /**
  *  A {@link Coder} for {@link Row}. It wraps the {@link Coder} for each 
element directly.
  */
 @Experimental
 public class RowCoder extends CustomCoder {
+  private static final Map CODER_MAP = 
ImmutableMap.builder()
+  .put(TypeName.BYTE, ByteCoder.of())
+  .put(TypeName.INT16, BigEndianShortCoder.of())
+  .put(TypeName.INT32, BigEndianIntegerCoder.of())
+  .put(TypeName.INT64, BigEndianLongCoder.of())
+  .put(TypeName.DECIMAL, BigDecimalCoder.of())
+  .put(TypeName.FLOAT, FloatCoder.of())
+  .put(TypeName.DOUBLE, DoubleCoder.of())
+  .put(TypeName.STRING, StringUtf8Coder.of())
+  .put(TypeName.DATETIME, InstantCoder.of())
+  .put(TypeName.BOOLEAN, BooleanCoder.of())
+  .build();
+
+  private static final Map ESTIMATED_FIELD_SIZES =
+  ImmutableMap.builder()
+  .put(TypeName.BYTE, Byte.BYTES)
+  .put(TypeName.INT16, Short.BYTES)
+  .put(TypeName.INT32, Integer.BYTES)
+  .put(TypeName.INT64, Long.BYTES)
+  .put(TypeName.FLOAT, Float.BYTES)
+  .put(TypeName.DOUBLE, Double.BYTES)
+  .put(TypeName.DECIMAL, 32)
+  .put(TypeName.BOOLEAN, 1)
+  .put(TypeName.DATETIME, Long.BYTES)
+  .build();
+
   private static final BitSetCoder nullListCoder = BitSetCoder.of();
 
-  private RowType rowType;
-  private List coders;
+  private Schema schema;
+
+  /**
+   * Returns the coder used for a given primitive type.
+   */
+  public static  Coder coderForPrimitiveType(TypeName typeName) {
+return (Coder) CODER_MAP.get(typeName);
+  }
+
+  /**
+   * Return the estimated serialized size of a give row object.
+   */
+  public static long estimatedSizeBytes(Row row) {
 
 Review comment:
   Does this belong to `RowCoder`? Right now this method doesn't do anything 
related to encoding/decoding and only uses `Schema` and `Row`. Would it be 
better to have this interface on `Schema`? Or probably just a separate utility 
class since it's not frequently used?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86855)
Time Spent: 3h 20m  (was: 3h 10m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86859=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86859
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178657027
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
+public boolean isStringType() {
+  return STRING_TYPES.contains(this);
+}
+public boolean isDateType() {
+  return DATE_TYPES.contains(this);
+}
+public boolean isContainerType() {
+  return CONTAINER_TYPES.contains(this);
+}
+public boolean isCompositeType() {
+  return COMPOSITE_TYPES.contains(this);
+}
+
+/** Returns a {@link FieldTypeDescriptor} representing this primitive 
type. */
+public FieldTypeDescriptor typeDescriptor() {
+  return FieldTypeDescriptor.of(this);
+}
+  }
+
+  /**
+   * A descriptor of a single field type. This is a recursive descriptor, as 
nested types are
+   * allowed.
+   */
+  @AutoValue
+  public abstract static class FieldTypeDescriptor implements Serializable {
 
 Review comment:
   If we have 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86870=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86870
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178680847
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
 ##
 @@ -75,191 +83,230 @@ public static Row nullRow(RowType rowType) {
* if type doesn't match.
*/
   public  T getValue(String fieldName) {
-return getValue(getRowType().indexOf(fieldName));
+return getValue(getSchema().indexOf(fieldName));
   }
 
   /**
* Get value by field index, {@link ClassCastException} is thrown
-   * if type doesn't match.
+   * if schema doesn't match.
*/
   @Nullable
   public  T getValue(int fieldIdx) {
 return (T) getValues().get(fieldIdx);
   }
 
   /**
-   * Get a {@link Byte} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#BYTE} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Byte getByte(String fieldName) {
-return getValue(fieldName);
+  public byte getByte(String fieldName) {
+return getByte(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Short} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#INT16} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Short getShort(String fieldName) {
-return getValue(fieldName);
+  public short getInt16(String fieldName) {
+return getInt16(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Integer} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#INT32} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Integer getInteger(String fieldName) {
-return getValue(fieldName);
+  public int getInt32(String fieldName) {
+return getInt32(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Float} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#INT64} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Float getFloat(String fieldName) {
-return getValue(fieldName);
+  public long getInt64(String fieldName) {
+return getInt64(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Double} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#DECIMAL} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Double getDouble(String fieldName) {
-return getValue(fieldName);
+  public BigDecimal getDecimal(String fieldName) {
+return getDecimal(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Long} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#FLOAT} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Long getLong(String fieldName) {
-return getValue(fieldName);
+  public float getFloat(String fieldName) {
+return getFloat(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link String} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#DOUBLE} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
+   */
+  public double getDouble(String fieldName) {
+return getDouble(getSchema().indexOf(fieldName));
+  }
+
+  /**
+   * Get a {@link TypeName#STRING} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
   public String getString(String fieldName) {
-return getValue(fieldName);
+return getString(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Date} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#DATETIME} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Date getDate(String fieldName) {
-return getValue(fieldName);
+  public ReadableDateTime getDateTime(String fieldName) {
+return getDateTime(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86857=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86857
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178619006
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianShortCoder.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.coders;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link BigEndianShortCoder} encodes {@link Short Shorts} in 4 bytes, 
big-endian.
+ */
+public class BigEndianShortCoder extends AtomicCoder {
+
+  public static BigEndianShortCoder of() {
+return INSTANCE;
+  }
+
+  /
+
+  private static final BigEndianShortCoder INSTANCE = new 
BigEndianShortCoder();
+  private static final TypeDescriptor TYPE_DESCRIPTOR = new 
TypeDescriptor() {};
+
+  private BigEndianShortCoder() {}
+
+  @Override
+  public void encode(Short value, OutputStream outStream) throws IOException {
+if (value == null) {
+  throw new CoderException("cannot encode a null Short");
+}
+new DataOutputStream(outStream).writeShort(value);
 
 Review comment:
   I think it will make sense to refactor these coders eventually, i'd create a 
jira to:
   1. Stop allocating new `DataOutputStream` for each value read/write. Maybe 
copy the implementation into a static utility class;
   2. Extract some base classes/builders. From the looks of it, a ton of code 
for numeric values coders is duplicated, we could probably simplify this with 
inheritance/composition, probably along the lines of:
   
   ```java
   
   class PrimitiveNumericCoder {
 // all the duplicate code.
 // type descriptor should still work based on the current instance.
   }
   
   class BigEndianShortCoder extends PrimitiveNumericCoder {
  public BigEndianShortCoder() {
 super("Short", DataStreamUtils::encodeShort, 2);
  }
   }
   
   class FloatCoder extends PrimitiveNumericCoder {
  public BigEndianShortCoder() {
 super("Float", DataStreamUtils::encodeFloat, 4);
  }
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86857)
Time Spent: 3.5h  (was: 3h 20m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86866=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86866
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178679058
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
+public boolean isStringType() {
+  return STRING_TYPES.contains(this);
+}
+public boolean isDateType() {
+  return DATE_TYPES.contains(this);
+}
+public boolean isContainerType() {
+  return CONTAINER_TYPES.contains(this);
+}
+public boolean isCompositeType() {
+  return COMPOSITE_TYPES.contains(this);
+}
+
+/** Returns a {@link FieldTypeDescriptor} representing this primitive 
type. */
+public FieldTypeDescriptor typeDescriptor() {
+  return FieldTypeDescriptor.of(this);
+}
+  }
+
+  /**
+   * A descriptor of a single field type. This is a recursive descriptor, as 
nested types are
+   * allowed.
+   */
+  @AutoValue
+  public abstract static class FieldTypeDescriptor implements Serializable {
+// Returns the type of this 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86864=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86864
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178680493
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
 ##
 @@ -75,191 +83,230 @@ public static Row nullRow(RowType rowType) {
* if type doesn't match.
*/
   public  T getValue(String fieldName) {
-return getValue(getRowType().indexOf(fieldName));
+return getValue(getSchema().indexOf(fieldName));
   }
 
   /**
* Get value by field index, {@link ClassCastException} is thrown
-   * if type doesn't match.
+   * if schema doesn't match.
*/
   @Nullable
   public  T getValue(int fieldIdx) {
 return (T) getValues().get(fieldIdx);
   }
 
   /**
-   * Get a {@link Byte} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#BYTE} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Byte getByte(String fieldName) {
-return getValue(fieldName);
+  public byte getByte(String fieldName) {
+return getByte(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Short} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#INT16} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Short getShort(String fieldName) {
-return getValue(fieldName);
+  public short getInt16(String fieldName) {
+return getInt16(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Integer} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#INT32} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Integer getInteger(String fieldName) {
-return getValue(fieldName);
+  public int getInt32(String fieldName) {
+return getInt32(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Float} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#INT64} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Float getFloat(String fieldName) {
-return getValue(fieldName);
+  public long getInt64(String fieldName) {
+return getInt64(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Double} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#DECIMAL} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Double getDouble(String fieldName) {
-return getValue(fieldName);
+  public BigDecimal getDecimal(String fieldName) {
+return getDecimal(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Long} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#FLOAT} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Long getLong(String fieldName) {
-return getValue(fieldName);
+  public float getFloat(String fieldName) {
+return getFloat(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link String} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#DOUBLE} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
+   */
+  public double getDouble(String fieldName) {
+return getDouble(getSchema().indexOf(fieldName));
+  }
+
+  /**
+   * Get a {@link TypeName#STRING} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
   public String getString(String fieldName) {
-return getValue(fieldName);
+return getString(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link Date} value by field name, {@link ClassCastException} is 
thrown
-   * if type doesn't match.
+   * Get a {@link TypeName#DATETIME} value by field name, {@link 
IllegalStateException} is thrown
+   * if schema doesn't match.
*/
-  public Date getDate(String fieldName) {
-return getValue(fieldName);
+  public ReadableDateTime getDateTime(String fieldName) {
+return getDateTime(getSchema().indexOf(fieldName));
   }
 
   /**
-   * Get a {@link 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86867
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178656406
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
 ##
 @@ -80,11 +81,11 @@ public void testSelectArrayValue() {
   public void testProjectArrayField() {
 PCollection input = pCollectionOf2Elements();
 
-RowType resultType =
+Schema resultType =
 RowSqlType
 .builder()
 .withIntegerField("f_int")
-.withArrayField("f_stringArr", SqlTypeCoders.VARCHAR)
+.withArrayField("f_stringArr", SqlTypeName.VARCHAR)
 
 Review comment:
   `RowSqlTypeBuilder` is part of Beam, while Calcite is not, we're mixing type 
systems by allowing usage of `TypeNames` and `SqlTypeNames` in the same context.
   
   From our conversion about this this morning, what I'm thinking is:
- one option is to completely remove the `RowSqlTypeBuilder`;
- add `Schema.builder().withInt32Field()` for field types supported in 
core and `.withField(FieldTypeDescriptor)`;
- define `SqlTypes` class:
   ```java
   class SqlTypes {
  public static FieldTypeDescriptor CHAR =
 TypeName.STRING.fieldTypeDescriptor().withMetadata("CHAR");
  public static FieldTypeDescriptor VARCHAR =
 
TypeName.STRING.fieldTypeDescriptor().withMetadata("VARCHAR");
   }
   ```
- this way users can use 
`RowType.builder().withInt32Field("blah").withField(SqlTypes.CHAR, "blah2")`;
   
   Alternatively:
- don't remove the `RowSqlTypeBuilder`, but subclass the `RowType.Builder` 
adding the SQL-specific methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86867)
Time Spent: 4h 50m  (was: 4h 40m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86861=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86861
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178678561
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
+public boolean isStringType() {
+  return STRING_TYPES.contains(this);
+}
+public boolean isDateType() {
+  return DATE_TYPES.contains(this);
+}
+public boolean isContainerType() {
+  return CONTAINER_TYPES.contains(this);
+}
+public boolean isCompositeType() {
+  return COMPOSITE_TYPES.contains(this);
+}
+
+/** Returns a {@link FieldTypeDescriptor} representing this primitive 
type. */
+public FieldTypeDescriptor typeDescriptor() {
+  return FieldTypeDescriptor.of(this);
+}
+  }
+
+  /**
+   * A descriptor of a single field type. This is a recursive descriptor, as 
nested types are
+   * allowed.
+   */
+  @AutoValue
+  public abstract static class FieldTypeDescriptor implements Serializable {
+// Returns the type of this 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86869=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86869
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178678436
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
+public boolean isStringType() {
+  return STRING_TYPES.contains(this);
+}
+public boolean isDateType() {
+  return DATE_TYPES.contains(this);
+}
+public boolean isContainerType() {
+  return CONTAINER_TYPES.contains(this);
+}
+public boolean isCompositeType() {
+  return COMPOSITE_TYPES.contains(this);
+}
+
+/** Returns a {@link FieldTypeDescriptor} representing this primitive 
type. */
+public FieldTypeDescriptor typeDescriptor() {
+  return FieldTypeDescriptor.of(this);
+}
+  }
+
+  /**
+   * A descriptor of a single field type. This is a recursive descriptor, as 
nested types are
+   * allowed.
+   */
+  @AutoValue
+  public abstract static class FieldTypeDescriptor implements Serializable {
+// Returns the type of this 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86858=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86858
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178677974
 
 

 ##
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
 ##
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link Schema} describes the fields in {@link Row}.
+ *
+ */
+@Experimental
+@AutoValue
+public abstract class Schema implements Serializable {
+  // A mapping between field names an indices.
+  private BiMap fieldIndices = HashBiMap.create();
+  public abstract List getFields();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setFields(List fields);
+abstract Schema build();
+  }
+
+  public static Schema of(List fields) {
+return Schema.fromFields(fields);
+  }
+
+  public static Schema of(Field ... fields) {
+return Schema.of(Arrays.asList(fields));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+if (!(o instanceof Schema)) {
+  return false;
+}
+Schema other = (Schema) o;
+return Objects.equals(fieldIndices, other.fieldIndices)
+&& Objects.equals(getFields(), other.getFields());
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hash(fieldIndices, getFields());
+  }
+
+  /**
+   * An enumerated list of supported types.
+   */
+  public enum TypeName {
+BYTE,// One-byte signed integer.
+INT16,   // two-byte signed integer.
+INT32,   // four-byte signed integer.
+INT64,   // eight-byte signed integer.
+DECIMAL,  // Decimal integer
+FLOAT,
+DOUBLE,
+STRING,  // String.
+DATETIME, // Date and time.
+BOOLEAN,  // Boolean.
+ARRAY,
+ROW;// The field is itself a nested row.
+
+public static final List NUMERIC_TYPES = ImmutableList.of(
+BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
+public static final List STRING_TYPES = ImmutableList.of(STRING);
+public static final List DATE_TYPES = ImmutableList.of(DATETIME);
+public static final List CONTAINER_TYPES = 
ImmutableList.of(ARRAY);
+public static final List COMPOSITE_TYPES = ImmutableList.of(ROW);
+
+public boolean isNumericType() {
+  return NUMERIC_TYPES.contains(this);
+}
 
 Review comment:
   Github is not feeling well, I may have posted this question in another 
thread. These methods are probably invoked a lot, should we change these to 
flags? E.g.:
   
   ```java
   enum TypeName {
 BYTE(true, false),
 STRING(false, true);
   
 private boolean isNumeric;
 private boolean isString;
   
 TypeName(boolean isNumeric, boolean isString) { ... }
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86858)
 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86472=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86472
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487901
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
 ##
 @@ -80,11 +81,11 @@ public void testSelectArrayValue() {
   public void testProjectArrayField() {
 PCollection input = pCollectionOf2Elements();
 
-RowType resultType =
+Schema resultType =
 RowSqlType
 .builder()
 .withIntegerField("f_int")
-.withArrayField("f_stringArr", SqlTypeCoders.VARCHAR)
+.withArrayField("f_stringArr", SqlTypeName.VARCHAR)
 
 Review comment:
   
   
   > **akedin** wrote:
   > I think that we should not use Calcite types in the public APIs. 
Especially when the schema is now in core.
   
   
   This is calling into the RowSqlType builder, who's job it is to convert 
using CalciteUtils. Alternatively we could eliminate that builder and have all 
callers just call the core builder using CalciteUtils - does that sound better?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86472)
Time Spent: 2h 40m  (was: 2.5h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86478=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86478
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487900
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java
 ##
 @@ -44,14 +44,14 @@
 
   private static final Row NULL_INPUT_ROW = null;
   private static final BoundedWindow NULL_WINDOW = null;
-  private static final Date DATE = str2DateTime("1984-04-19 01:02:03");
+  private static final DateTime DATE = str2DateTime("1984-04-19 01:02:03");
 
-  private static final Date DATE_PLUS_15_SECONDS = new 
DateTime(DATE).plusSeconds(15).toDate();
-  private static final Date DATE_PLUS_10_MINUTES = new 
DateTime(DATE).plusMinutes(10).toDate();
-  private static final Date DATE_PLUS_7_HOURS = new 
DateTime(DATE).plusHours(7).toDate();
-  private static final Date DATE_PLUS_3_DAYS = new 
DateTime(DATE).plusDays(3).toDate();
-  private static final Date DATE_PLUS_2_MONTHS = new 
DateTime(DATE).plusMonths(2).toDate();
-  private static final Date DATE_PLUS_11_YEARS = new 
DateTime(DATE).plusYears(11).toDate();
+  private static final DateTime DATE_PLUS_15_SECONDS = new 
DateTime(DATE).plusSeconds(15);
 
 Review comment:
   
   
   > **akedin** wrote:
   > `DATE_PLUS_15_SECONDS  = DATE.plusSeconds()` should work
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86478)
Time Spent: 3h 10m  (was: 3h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86473=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86473
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487903
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
 ##
 @@ -153,6 +156,15 @@ public boolean accept() {
 }
   }
 
+  //
+  private static  T convertValue(T value, SqlTypeName typeName) {
+// TODO: We should just convert Calcite to use either Joda or Java8 time.
 
 Review comment:
   
   
   > **akedin** wrote:
   > is this still needed?
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86473)
Time Spent: 2h 50m  (was: 2h 40m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86474=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86474
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487906
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
 ##
 @@ -281,7 +281,7 @@ public void testIsNullAndIsNotNull() throws Exception {
   }
 
   @Override protected PCollection getTestPCollection() {
-RowType type = RowSqlType.builder()
+Schema type = RowSqlType.builder()
 
 Review comment:
   
   
   > **akedin** wrote:
   > Now that we have all type info in core, we should have the builder there 
as well: `Schema.builder().withInt32().build()`
   
   
   So the "extra" use for this class now is that it has builder methods 
specifically for SQL types (e.g. timestamp, date, varchar, char), and passes 
them through CalciteUtils to do the correct translation to schema. Seems less 
error prone to centralize this rather then to have everyone call into 
CalciteUtils.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86474)
Time Spent: 3h  (was: 2h 50m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86476=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86476
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487907
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
 ##
 @@ -177,21 +171,30 @@ public AggregationAdaptor(List 
aggregationCalls, RowType sourceRo
   int refIndexKey = call.getArgList().get(0);
   int refIndexValue = call.getArgList().get(1);
 
+  FieldTypeDescriptor keyDescriptor =
+  sourceSchema.getField(refIndexKey).getTypeDescriptor();
   BeamSqlInputRefExpression sourceExpKey = new 
BeamSqlInputRefExpression(
-  CalciteUtils.getFieldCalciteType(sourceRowType, 
refIndexKey), refIndexKey);
+  CalciteUtils.toSqlTypeName(keyDescriptor.getType(), 
keyDescriptor.getMetadata()),
 
 Review comment:
   
   
   > **akedin** wrote:
   > I would just pass the whole `FieldTypeDescriptor` into the 
`CalciteUtils.toSqlTypeName()`
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86476)
Time Spent: 3h 10m  (was: 3h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86471=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86471
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487898
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
 ##
 @@ -163,13 +165,14 @@ private static Table mockTable(String name, String type, 
String comment, JSONObj
 .columns(ImmutableList.of(
 Column.builder()
 .name("id")
-.coder(INTEGER)
+.typeDescriptor(FieldTypeDescriptor.of(FieldType.INT32))
 .primaryKey(false)
 .comment("id")
 .build(),
 Column.builder()
 .name("name")
-.coder(VARCHAR)
+.typeDescriptor(FieldTypeDescriptor.of(FieldType.STRING)
 
 Review comment:
   
   
   > **akedin** wrote:
   > Can we reverse this logic? I.e.:
   > 
   > ```java
   > FieldType.STRING.typeDescriptor();
   > FieldType.INT32.typeDescriptor();
   > FieldType.STRING.typeDescriptor().withMetadata(...);
   > ```
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86471)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86477=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86477
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487910
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
 ##
 @@ -154,17 +151,14 @@ private static Row combineTwoRowsIntoOne(Row leftRow,
*/
   private static Row combineTwoRowsIntoOneHelper(Row leftRow, Row rightRow) {
 // build the type
-List names = new ArrayList<>(leftRow.getFieldCount() + 
rightRow.getFieldCount());
-names.addAll(leftRow.getRowType().getFieldNames());
-names.addAll(rightRow.getRowType().getFieldNames());
-
-List types = new ArrayList<>(leftRow.getFieldCount() + 
rightRow.getFieldCount());
-types.addAll(leftRow.getRowType().getRowCoder().getCoders());
-types.addAll(rightRow.getRowType().getRowCoder().getCoders());
-RowType type = RowType.fromNamesAndCoders(names, types);
+List fields = new ArrayList<>(
+leftRow.getFieldCount() + rightRow.getFieldCount());
+fields.addAll(leftRow.getSchema().getFields());
+fields.addAll(rightRow.getSchema().getFields());
+Schema type = Schema.of(fields);
 
 Review comment:
   
   
   > **akedin** wrote:
   > Couple of questions related to this piece:
   >  1. Should schema internally support operations like this? E.g. 
`schema1.join(schema2)`?
   >  2. It probably makes sense to cache these schemas eventually, I would 
leave a TODO;
   >  3. Even if schema doesn't support complex operations, probably makes 
sense to have a complex builder: 
`Schema.builder().addFields(schema1.getFields()).addFields(schema2.getFields()).build()`;
   
   
   Yes, at the very least we should have helper functions. Could be on schema, 
could be on SchemaBuilder, or could be a separate SchemaHelpers class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86477)
Time Spent: 3h 10m  (was: 3h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86469=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86469
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487902
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 ##
 @@ -51,44 +51,44 @@
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
   private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
-  public static final Date FIRST_DATE = new Date(1);
-  public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
-  public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 
+ 1);
+  public static final DateTime FIRST_DATE = new DateTime(1);
+  public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+  public static final DateTime THIRD_DATE = new DateTime(1 + 3600 * 1000 + 
3600 * 1000 + 1);
   private static final Duration WINDOW_SIZE = Duration.standardHours(1);
 
   @BeforeClass
   public static void prepare() {
 BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable
 .of(
-SqlTypeCoders.INTEGER, "order_id",
-SqlTypeCoders.INTEGER, "site_id",
-SqlTypeCoders.INTEGER, "price",
-SqlTypeCoders.TIMESTAMP, "order_time"
+FieldType.INT32, "order_id",
+FieldType.INT32, "site_id",
+FieldType.INT32, "price",
+FieldType.INT32, "order_time"
 
 Review comment:
   
   
   > **akedin** wrote:
   > Why not `FieldType.DATETIME` ?
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86469)
Time Spent: 2.5h  (was: 2h 20m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86475=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86475
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487909
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
 ##
 @@ -153,19 +146,19 @@ public void buildRunAndCheck() {
   PCollection inputCollection = getTestPCollection();
   System.out.println("SQL:>\n" + getSql());
   try {
-RowType rowType =
+Schema schema =
 exps.stream()
-.map(exp -> RowType.newField(
+.map(exp -> Schema.Field.of(
 
 Review comment:
   
   
   > **akedin** wrote:
   > `Schema.newField()` ?
   
   
   could easily add it, but doesn't seem to save a whole lot?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86475)
Time Spent: 3h  (was: 2h 50m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86479=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86479
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487905
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
 ##
 @@ -177,21 +171,30 @@ public AggregationAdaptor(List 
aggregationCalls, RowType sourceRo
   int refIndexKey = call.getArgList().get(0);
   int refIndexValue = call.getArgList().get(1);
 
+  FieldTypeDescriptor keyDescriptor =
+  sourceSchema.getField(refIndexKey).getTypeDescriptor();
   BeamSqlInputRefExpression sourceExpKey = new 
BeamSqlInputRefExpression(
-  CalciteUtils.getFieldCalciteType(sourceRowType, 
refIndexKey), refIndexKey);
+  CalciteUtils.toSqlTypeName(keyDescriptor.getType(), 
keyDescriptor.getMetadata()),
+  refIndexKey);
+
+  FieldTypeDescriptor valueDescriptor =
+  sourceSchema.getField(refIndexValue).getTypeDescriptor();
   BeamSqlInputRefExpression sourceExpValue = new 
BeamSqlInputRefExpression(
-  CalciteUtils.getFieldCalciteType(sourceRowType, 
refIndexValue), refIndexValue);
+  CalciteUtils.toSqlTypeName(valueDescriptor.getType(), 
valueDescriptor.getMetadata()),
+  refIndexValue);
 
   sourceFieldExps.add(KV.of(sourceExpKey, sourceExpValue));
 } else {
   int refIndex = call.getArgList().size() > 0 ? 
call.getArgList().get(0) : 0;
+  FieldTypeDescriptor typeDescriptor = 
sourceSchema.getField(refIndex).getTypeDescriptor();
 
 Review comment:
   
   
   > **akedin** wrote:
   > It's unclear where we're supposed to be using `FieldTypeDescriptor` vs 
`FieldType`. Can they be combined? So that, for example, all fields in 
`FieldType` become instances of `FieldTypeDescriptor`. Do we need both?
   
   
   FieldType is an enum that identifies the type of a row . FieldTypeDescriptor 
contains extra information needed to resolve the type (e.g. the type of the 
component element or the schema of the row).
   
   I think it is possible to merge the classes (since Java enums are just 
classes), but I think it's better to maintain .a clean separation between 
primitive field types and the recursive spec needed to resolve a type. Also 
while it's ok to add some extra convenience functionality to an Enum class, 
making it a fully recursive type seems like an abuse of enums.
   
   For reference, the two classes are _roughly_ equivalent to the SqlTypeName 
and RelDataType in Calcite. Would it be clearer if I renamed FieldType -> 
TypeName?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86479)
Time Spent: 3h 10m  (was: 3h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86480=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86480
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487904
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpressionTest.java
 ##
 @@ -43,13 +42,15 @@
   private static final Row NULL_ROW = null;
   private static final BoundedWindow NULL_WINDOW = null;
 
-  private static final Date DATE = new Date(2017, 3, 4, 3, 2, 1);
-  private static final Date DATE_MINUS_2_SEC = new 
DateTime(DATE).minusSeconds(2).toDate();
-  private static final Date DATE_MINUS_3_MIN = new 
DateTime(DATE).minusMinutes(3).toDate();
-  private static final Date DATE_MINUS_4_HOURS = new 
DateTime(DATE).minusHours(4).toDate();
-  private static final Date DATE_MINUS_7_DAYS = new 
DateTime(DATE).minusDays(7).toDate();
-  private static final Date DATE_MINUS_2_MONTHS = new 
DateTime(DATE).minusMonths(2).toDate();
-  private static final Date DATE_MINUS_1_YEAR = new 
DateTime(DATE).minusYears(1).toDate();
+  private static final DateTime DATE = new DateTime()
+  .withDate(2017, 3, 4)
 
 Review comment:
   
   
   > **akedin** wrote:
   > formatting?
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86480)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-04-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86470=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86470
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 02/Apr/18 05:08
Start Date: 02/Apr/18 05:08
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178487899
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 ##
 @@ -38,149 +37,152 @@
  */
 public class CalciteUtils {
   private static final long UNLIMITED_ARRAY_SIZE = -1L;
-  private static final BiMap 
BEAM_TO_CALCITE_TYPE_MAPPING =
-  ImmutableBiMap.builder()
-  .put(SqlTypeCoders.TINYINT, SqlTypeName.TINYINT)
-  .put(SqlTypeCoders.SMALLINT, SqlTypeName.SMALLINT)
-  .put(SqlTypeCoders.INTEGER, SqlTypeName.INTEGER)
-  .put(SqlTypeCoders.BIGINT, SqlTypeName.BIGINT)
+  private static final BiMap 
BEAM_TO_CALCITE_TYPE_MAPPING =
+  ImmutableBiMap.builder()
+  .put(FieldType.BYTE, SqlTypeName.TINYINT)
+  .put(FieldType.INT16, SqlTypeName.SMALLINT)
+  .put(FieldType.INT32, SqlTypeName.INTEGER)
+  .put(FieldType.INT64, SqlTypeName.BIGINT)
 
-  .put(SqlTypeCoders.FLOAT, SqlTypeName.FLOAT)
-  .put(SqlTypeCoders.DOUBLE, SqlTypeName.DOUBLE)
+  .put(FieldType.FLOAT, SqlTypeName.FLOAT)
+  .put(FieldType.DOUBLE, SqlTypeName.DOUBLE)
 
-  .put(SqlTypeCoders.DECIMAL, SqlTypeName.DECIMAL)
+  .put(FieldType.DECIMAL, SqlTypeName.DECIMAL)
 
-  .put(SqlTypeCoders.CHAR, SqlTypeName.CHAR)
-  .put(SqlTypeCoders.VARCHAR, SqlTypeName.VARCHAR)
+  .put(FieldType.STRING, SqlTypeName.VARCHAR)
 
-  .put(SqlTypeCoders.DATE, SqlTypeName.DATE)
-  .put(SqlTypeCoders.TIME, SqlTypeName.TIME)
-  .put(SqlTypeCoders.TIMESTAMP, SqlTypeName.TIMESTAMP)
+  .put(FieldType.DATETIME, SqlTypeName.TIMESTAMP)
 
-  .put(SqlTypeCoders.BOOLEAN, SqlTypeName.BOOLEAN)
-  .build();
+  .put(FieldType.BOOLEAN, SqlTypeName.BOOLEAN)
 
-  private static final BiMap 
CALCITE_TO_BEAM_TYPE_MAPPING =
+  .put(FieldType.ARRAY, SqlTypeName.ARRAY)
+  .put(FieldType.ROW, SqlTypeName.ROW)
 
 Review comment:
   
   
   > **akedin** wrote:
   > Not sure if this will work, but I would try to change this map to 
`BiMap`, so that it is possible to do it like 
this:
   > 
   > ```java
   >   BEAM_TO_CALCITE_MAPPING = 
   > ...
   > .put(FieldType.DECIMAL.typeDescriptor(), SqlTypeName.DECIMAL)
   > .put(FieldType.STRING.typeDescriptor().withMetadata(CHAR_METADATA), 
SqlTypeName.CHAR)
   > .put(FieldType.STRING.typeDescriptor().withMetadata(VARCHAR_METADATA), 
SqlTypeName.VARCHAR)
   > .put(FieldType.DATETIME.typeDescriptor().withMetadata(TIME_METADATA), 
SqlTypeName.TIME)
   > 
.put(FieldType.DATETIME.typeDescriptor().withMetadata(TIMESTAMP_METADATA), 
SqlTypeName.TIMESTAMP)
   >  ...
   > ```
   > 
   > Then you probably don't need to look up metadata separately each time and 
now have a 1-1 mapping
   
   
   Good idea: done. Required a bit more work since we need to support mapping 
beam type -> sql type when the metadata isn't set (in this case we have a 
default mapping - e.g. String -> VARCHAR).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86470)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> 

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86184=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86184
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 21:59
Start Date: 30/Mar/18 21:59
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178376529
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
 ##
 @@ -177,21 +171,30 @@ public AggregationAdaptor(List 
aggregationCalls, RowType sourceRo
   int refIndexKey = call.getArgList().get(0);
   int refIndexValue = call.getArgList().get(1);
 
+  FieldTypeDescriptor keyDescriptor =
+  sourceSchema.getField(refIndexKey).getTypeDescriptor();
   BeamSqlInputRefExpression sourceExpKey = new 
BeamSqlInputRefExpression(
-  CalciteUtils.getFieldCalciteType(sourceRowType, 
refIndexKey), refIndexKey);
+  CalciteUtils.toSqlTypeName(keyDescriptor.getType(), 
keyDescriptor.getMetadata()),
+  refIndexKey);
+
+  FieldTypeDescriptor valueDescriptor =
+  sourceSchema.getField(refIndexValue).getTypeDescriptor();
   BeamSqlInputRefExpression sourceExpValue = new 
BeamSqlInputRefExpression(
-  CalciteUtils.getFieldCalciteType(sourceRowType, 
refIndexValue), refIndexValue);
+  CalciteUtils.toSqlTypeName(valueDescriptor.getType(), 
valueDescriptor.getMetadata()),
+  refIndexValue);
 
   sourceFieldExps.add(KV.of(sourceExpKey, sourceExpValue));
 } else {
   int refIndex = call.getArgList().size() > 0 ? 
call.getArgList().get(0) : 0;
+  FieldTypeDescriptor typeDescriptor = 
sourceSchema.getField(refIndex).getTypeDescriptor();
 
 Review comment:
   It's unclear where we're supposed to be using `FieldTypeDescriptor` vs 
`FieldType`. Can they be combined? So that, for example, all fields in 
`FieldType` become instances of `FieldTypeDescriptor`. Do we need both?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86184)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86182=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86182
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 21:59
Start Date: 30/Mar/18 21:59
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178372816
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
 ##
 @@ -154,17 +151,14 @@ private static Row combineTwoRowsIntoOne(Row leftRow,
*/
   private static Row combineTwoRowsIntoOneHelper(Row leftRow, Row rightRow) {
 // build the type
-List names = new ArrayList<>(leftRow.getFieldCount() + 
rightRow.getFieldCount());
-names.addAll(leftRow.getRowType().getFieldNames());
-names.addAll(rightRow.getRowType().getFieldNames());
-
-List types = new ArrayList<>(leftRow.getFieldCount() + 
rightRow.getFieldCount());
-types.addAll(leftRow.getRowType().getRowCoder().getCoders());
-types.addAll(rightRow.getRowType().getRowCoder().getCoders());
-RowType type = RowType.fromNamesAndCoders(names, types);
+List fields = new ArrayList<>(
+leftRow.getFieldCount() + rightRow.getFieldCount());
+fields.addAll(leftRow.getSchema().getFields());
+fields.addAll(rightRow.getSchema().getFields());
+Schema type = Schema.of(fields);
 
 Review comment:
   Couple of questions related to this piece:
1. Should schema internally support operations like this? E.g. 
`schema1.join(schema2)`?
2. It probably makes sense to cache these schemas eventually, I would leave 
a TODO;
3. Even if schema doesn't support complex operations, probably makes sense 
to have a complex builder: 
`Schema.builder().addFields(schema1.getFields()).addFields(schema2.getFields()).build()`;
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86182)
Time Spent: 2h 10m  (was: 2h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86146=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86146
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 20:16
Start Date: 30/Mar/18 20:16
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178369253
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 ##
 @@ -38,149 +37,152 @@
  */
 public class CalciteUtils {
   private static final long UNLIMITED_ARRAY_SIZE = -1L;
-  private static final BiMap 
BEAM_TO_CALCITE_TYPE_MAPPING =
-  ImmutableBiMap.builder()
-  .put(SqlTypeCoders.TINYINT, SqlTypeName.TINYINT)
-  .put(SqlTypeCoders.SMALLINT, SqlTypeName.SMALLINT)
-  .put(SqlTypeCoders.INTEGER, SqlTypeName.INTEGER)
-  .put(SqlTypeCoders.BIGINT, SqlTypeName.BIGINT)
+  private static final BiMap 
BEAM_TO_CALCITE_TYPE_MAPPING =
+  ImmutableBiMap.builder()
+  .put(FieldType.BYTE, SqlTypeName.TINYINT)
+  .put(FieldType.INT16, SqlTypeName.SMALLINT)
+  .put(FieldType.INT32, SqlTypeName.INTEGER)
+  .put(FieldType.INT64, SqlTypeName.BIGINT)
 
-  .put(SqlTypeCoders.FLOAT, SqlTypeName.FLOAT)
-  .put(SqlTypeCoders.DOUBLE, SqlTypeName.DOUBLE)
+  .put(FieldType.FLOAT, SqlTypeName.FLOAT)
+  .put(FieldType.DOUBLE, SqlTypeName.DOUBLE)
 
-  .put(SqlTypeCoders.DECIMAL, SqlTypeName.DECIMAL)
+  .put(FieldType.DECIMAL, SqlTypeName.DECIMAL)
 
-  .put(SqlTypeCoders.CHAR, SqlTypeName.CHAR)
-  .put(SqlTypeCoders.VARCHAR, SqlTypeName.VARCHAR)
+  .put(FieldType.STRING, SqlTypeName.VARCHAR)
 
-  .put(SqlTypeCoders.DATE, SqlTypeName.DATE)
-  .put(SqlTypeCoders.TIME, SqlTypeName.TIME)
-  .put(SqlTypeCoders.TIMESTAMP, SqlTypeName.TIMESTAMP)
+  .put(FieldType.DATETIME, SqlTypeName.TIMESTAMP)
 
-  .put(SqlTypeCoders.BOOLEAN, SqlTypeName.BOOLEAN)
-  .build();
+  .put(FieldType.BOOLEAN, SqlTypeName.BOOLEAN)
 
-  private static final BiMap 
CALCITE_TO_BEAM_TYPE_MAPPING =
+  .put(FieldType.ARRAY, SqlTypeName.ARRAY)
+  .put(FieldType.ROW, SqlTypeName.ROW)
 
 Review comment:
   Not sure if this will work, but I would try to change this map to 
`BiMap`, so that it is possible to do it like 
this:
   
   ```java
 BEAM_TO_CALCITE_MAPPING = 
   ...
   .put(FieldType.DECIMAL.typeDescriptor(), SqlTypeName.DECIMAL)
   .put(FieldType.STRING.typeDescriptor().withMetadata(CHAR_METADATA), 
SqlTypeName.CHAR)
   .put(FieldType.STRING.typeDescriptor().withMetadata(VARCHAR_METADATA), 
SqlTypeName.VARCHAR)
   .put(FieldType.DATETIME.typeDescriptor().withMetadata(TIME_METADATA), 
SqlTypeName.TIME)
   
.put(FieldType.DATETIME.typeDescriptor().withMetadata(TIMESTAMP_METADATA), 
SqlTypeName.TIMESTAMP)
...
   ```
   
   Then you probably don't need to look up metadata separately each time and 
now have a 1-1 mapping


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86146)
Time Spent: 2h  (was: 1h 50m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA

[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86145=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86145
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 20:15
Start Date: 30/Mar/18 20:15
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178369253
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 ##
 @@ -38,149 +37,152 @@
  */
 public class CalciteUtils {
   private static final long UNLIMITED_ARRAY_SIZE = -1L;
-  private static final BiMap 
BEAM_TO_CALCITE_TYPE_MAPPING =
-  ImmutableBiMap.builder()
-  .put(SqlTypeCoders.TINYINT, SqlTypeName.TINYINT)
-  .put(SqlTypeCoders.SMALLINT, SqlTypeName.SMALLINT)
-  .put(SqlTypeCoders.INTEGER, SqlTypeName.INTEGER)
-  .put(SqlTypeCoders.BIGINT, SqlTypeName.BIGINT)
+  private static final BiMap 
BEAM_TO_CALCITE_TYPE_MAPPING =
+  ImmutableBiMap.builder()
+  .put(FieldType.BYTE, SqlTypeName.TINYINT)
+  .put(FieldType.INT16, SqlTypeName.SMALLINT)
+  .put(FieldType.INT32, SqlTypeName.INTEGER)
+  .put(FieldType.INT64, SqlTypeName.BIGINT)
 
-  .put(SqlTypeCoders.FLOAT, SqlTypeName.FLOAT)
-  .put(SqlTypeCoders.DOUBLE, SqlTypeName.DOUBLE)
+  .put(FieldType.FLOAT, SqlTypeName.FLOAT)
+  .put(FieldType.DOUBLE, SqlTypeName.DOUBLE)
 
-  .put(SqlTypeCoders.DECIMAL, SqlTypeName.DECIMAL)
+  .put(FieldType.DECIMAL, SqlTypeName.DECIMAL)
 
-  .put(SqlTypeCoders.CHAR, SqlTypeName.CHAR)
-  .put(SqlTypeCoders.VARCHAR, SqlTypeName.VARCHAR)
+  .put(FieldType.STRING, SqlTypeName.VARCHAR)
 
-  .put(SqlTypeCoders.DATE, SqlTypeName.DATE)
-  .put(SqlTypeCoders.TIME, SqlTypeName.TIME)
-  .put(SqlTypeCoders.TIMESTAMP, SqlTypeName.TIMESTAMP)
+  .put(FieldType.DATETIME, SqlTypeName.TIMESTAMP)
 
-  .put(SqlTypeCoders.BOOLEAN, SqlTypeName.BOOLEAN)
-  .build();
+  .put(FieldType.BOOLEAN, SqlTypeName.BOOLEAN)
 
-  private static final BiMap 
CALCITE_TO_BEAM_TYPE_MAPPING =
+  .put(FieldType.ARRAY, SqlTypeName.ARRAY)
+  .put(FieldType.ROW, SqlTypeName.ROW)
 
 Review comment:
   Not sure if this will work, but I would try to change this map to 
`BiMap`, so that it is possible to do it like 
this:
   
   ```java
 BEAM_TO_CALCITE_MAPPING = 
   ...
   .put(FieldType.DECIMAL.typeDescriptor(), SqlTypeName.DECIMAL)
   .put(FieldType.STRING.withMetadata(CHAR_METADATA), SqlTypeName.CHAR)
   .put(FieldType.STRING.withMetadata(VARCHAR_METADATA), 
SqlTypeName.VARCHAR)
   .put(FieldType.DATETIME.withMetadata(TIME_METADATA), SqlTypeName.TIME)
   .put(FieldType.DATETIME.withMetadata(TIMESTAMP_METADATA), 
SqlTypeName.TIMESTAMP)
...
   ```
   
   Then you probably don't need to look up metadata separately each time and 
now have a 1-1 mapping


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86145)
Time Spent: 1h 50m  (was: 1h 40m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86129=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86129
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 18:54
Start Date: 30/Mar/18 18:54
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178347126
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
 ##
 @@ -80,11 +81,11 @@ public void testSelectArrayValue() {
   public void testProjectArrayField() {
 PCollection input = pCollectionOf2Elements();
 
-RowType resultType =
+Schema resultType =
 RowSqlType
 .builder()
 .withIntegerField("f_int")
-.withArrayField("f_stringArr", SqlTypeCoders.VARCHAR)
+.withArrayField("f_stringArr", SqlTypeName.VARCHAR)
 
 Review comment:
   I think that we should not use Calcite types in the public APIs. Especially 
when the schema is now in core.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86129)
Time Spent: 1h 40m  (was: 1.5h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86126=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86126
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 18:54
Start Date: 30/Mar/18 18:54
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178338518
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
 ##
 @@ -163,13 +165,14 @@ private static Table mockTable(String name, String type, 
String comment, JSONObj
 .columns(ImmutableList.of(
 Column.builder()
 .name("id")
-.coder(INTEGER)
+.typeDescriptor(FieldTypeDescriptor.of(FieldType.INT32))
 .primaryKey(false)
 .comment("id")
 .build(),
 Column.builder()
 .name("name")
-.coder(VARCHAR)
+.typeDescriptor(FieldTypeDescriptor.of(FieldType.STRING)
 
 Review comment:
   Can we reverse this logic? I.e.:
   
   ```java
   FieldType.STRING.typeDescriptor();
   FieldType.INT32.typeDescriptor();
   FieldType.STRING.typeDescriptor().withMetadata(...);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86126)
Time Spent: 1h 20m  (was: 1h 10m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86127=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86127
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 18:54
Start Date: 30/Mar/18 18:54
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178340147
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java
 ##
 @@ -44,14 +44,14 @@
 
   private static final Row NULL_INPUT_ROW = null;
   private static final BoundedWindow NULL_WINDOW = null;
-  private static final Date DATE = str2DateTime("1984-04-19 01:02:03");
+  private static final DateTime DATE = str2DateTime("1984-04-19 01:02:03");
 
-  private static final Date DATE_PLUS_15_SECONDS = new 
DateTime(DATE).plusSeconds(15).toDate();
-  private static final Date DATE_PLUS_10_MINUTES = new 
DateTime(DATE).plusMinutes(10).toDate();
-  private static final Date DATE_PLUS_7_HOURS = new 
DateTime(DATE).plusHours(7).toDate();
-  private static final Date DATE_PLUS_3_DAYS = new 
DateTime(DATE).plusDays(3).toDate();
-  private static final Date DATE_PLUS_2_MONTHS = new 
DateTime(DATE).plusMonths(2).toDate();
-  private static final Date DATE_PLUS_11_YEARS = new 
DateTime(DATE).plusYears(11).toDate();
+  private static final DateTime DATE_PLUS_15_SECONDS = new 
DateTime(DATE).plusSeconds(15);
 
 Review comment:
   `DATE_PLUS_15_SECONDS  = DATE.plusSeconds()` should work


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86127)
Time Spent: 1h 20m  (was: 1h 10m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86128=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86128
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 18:54
Start Date: 30/Mar/18 18:54
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178338665
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpressionTest.java
 ##
 @@ -43,13 +42,15 @@
   private static final Row NULL_ROW = null;
   private static final BoundedWindow NULL_WINDOW = null;
 
-  private static final Date DATE = new Date(2017, 3, 4, 3, 2, 1);
-  private static final Date DATE_MINUS_2_SEC = new 
DateTime(DATE).minusSeconds(2).toDate();
-  private static final Date DATE_MINUS_3_MIN = new 
DateTime(DATE).minusMinutes(3).toDate();
-  private static final Date DATE_MINUS_4_HOURS = new 
DateTime(DATE).minusHours(4).toDate();
-  private static final Date DATE_MINUS_7_DAYS = new 
DateTime(DATE).minusDays(7).toDate();
-  private static final Date DATE_MINUS_2_MONTHS = new 
DateTime(DATE).minusMonths(2).toDate();
-  private static final Date DATE_MINUS_1_YEAR = new 
DateTime(DATE).minusYears(1).toDate();
+  private static final DateTime DATE = new DateTime()
+  .withDate(2017, 3, 4)
 
 Review comment:
   formatting?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86128)
Time Spent: 1.5h  (was: 1h 20m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86098=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86098
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 17:53
Start Date: 30/Mar/18 17:53
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178337973
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 ##
 @@ -51,44 +51,44 @@
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
   private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
-  public static final Date FIRST_DATE = new Date(1);
-  public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
-  public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 
+ 1);
+  public static final DateTime FIRST_DATE = new DateTime(1);
+  public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+  public static final DateTime THIRD_DATE = new DateTime(1 + 3600 * 1000 + 
3600 * 1000 + 1);
   private static final Duration WINDOW_SIZE = Duration.standardHours(1);
 
   @BeforeClass
   public static void prepare() {
 BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable
 .of(
-SqlTypeCoders.INTEGER, "order_id",
-SqlTypeCoders.INTEGER, "site_id",
-SqlTypeCoders.INTEGER, "price",
-SqlTypeCoders.TIMESTAMP, "order_time"
+FieldType.INT32, "order_id",
+FieldType.INT32, "site_id",
+FieldType.INT32, "price",
+FieldType.INT32, "order_time"
 
 Review comment:
   Why not `FieldType.DATETIME` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86098)
Time Spent: 1h 10m  (was: 1h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86095=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86095
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 17:44
Start Date: 30/Mar/18 17:44
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178336300
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
 ##
 @@ -153,19 +146,19 @@ public void buildRunAndCheck() {
   PCollection inputCollection = getTestPCollection();
   System.out.println("SQL:>\n" + getSql());
   try {
-RowType rowType =
+Schema schema =
 exps.stream()
-.map(exp -> RowType.newField(
+.map(exp -> Schema.Field.of(
 
 Review comment:
   `Schema.newField()` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86095)
Time Spent: 1h  (was: 50m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86094=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86094
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 30/Mar/18 17:44
Start Date: 30/Mar/18 17:44
Worklog Time Spent: 10m 
  Work Description: akedin commented on a change in pull request #4964: 
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178336155
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
 ##
 @@ -281,7 +281,7 @@ public void testIsNullAndIsNotNull() throws Exception {
   }
 
   @Override protected PCollection getTestPCollection() {
-RowType type = RowSqlType.builder()
+Schema type = RowSqlType.builder()
 
 Review comment:
   Now that we have all type info in core, we should have the builder there as 
well: `Schema.builder().withInt32().build()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86094)
Time Spent: 50m  (was: 40m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=85445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85445
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 28/Mar/18 23:47
Start Date: 28/Mar/18 23:47
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-377074839
 
 
   FYI the test breakage is a known issue 
(https://issues.apache.org/jira/browse/BEAM-3964) and is unrelated to this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85445)
Time Spent: 40m  (was: 0.5h)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-27 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=85133=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85133
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 28/Mar/18 04:55
Start Date: 28/Mar/18 04:55
Worklog Time Spent: 10m 
  Work Description: akedin commented on issue #4964: [BEAM-3437] Introduce 
Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-376761217
 
 
   Looking


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85133)
Time Spent: 0.5h  (was: 20m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-27 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=85112=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85112
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 28/Mar/18 02:24
Start Date: 28/Mar/18 02:24
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-376738825
 
 
   @jbonofre FYI. This PR excludes SchemaCoder and SchemaRegistry - for now it 
is for BeamSQL only.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85112)
Time Spent: 20m  (was: 10m)

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3437) Support schema in PCollections

2018-03-27 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=85111=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85111
 ]

ASF GitHub Bot logged work on BEAM-3437:


Author: ASF GitHub Bot
Created on: 28/Mar/18 02:23
Start Date: 28/Mar/18 02:23
Worklog Time Spent: 10m 
  Work Description: reuvenlax opened a new pull request #4964: [BEAM-3437] 
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964
 
 
   We introduce the new Schema and Row classes. In this pull request, the 
classes are only used in BeamSQL. This replaces the previous BeamSQL RowType 
class which was a mapping from field name to coder. Nested arrays and rows are 
fully supported.
   
   Future PRs will add support for schemas on any PCollection.
   
   R: @akedin 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85111)
Time Spent: 10m
Remaining Estimate: 0h

> Support schema in PCollections
> --
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
>  Issue Type: Wish
>  Components: beam-model
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema 
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark 
> runner with data frames for instance).
> A technical draft document has been created: 
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=BhykQIs=5a203b46=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion" 
> PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)