Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Niel Markwick via dev
 Regarding ordering; anything that requires inputs to be in a specific
order in Beam will be problematic due the nature of parallel processing -
you will always get race conditions.

Assuming you are still intending to Flatten the bigQuery and PubSub
PCollections, using Wait(on) before flattening the 2 PCollections will not
make much difference, as there is still a strong likelihood that the BQ and
Pubsub records will be interleaved in the BigQuery output.

If the BQ read is to update internal state, then I assume that you need to
store that state somewhere in your BusinessLogic DoFn() - if this storage
is in RAM, then all worker instances of your  BusinessLogic DoFn() will
need to have access to all records of that BQ read - the only way to do
this is through a side input - if you are sending this data it in the
normal input and you have multiple workers, each worker will only get some,
not all, of the BQ records so each worker's internal state would be
inconsistent.

 > Using big query client would mean we would have to run individual
queries for each of these 300k keys from the BusinessLogic() dofn which
operates in a global window KV

Or read all the BigQuery records at once on BusinessLogic() startup and
store them in the internal state ... which ends up being the same as using
a side input.


-- 
<https://cloud.google.com>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect <https://cloud.google.com/docs/tutorials>
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you have received this communication by mistake, please don't forward it
to anyone else (it may contain confidential or privileged information),
please erase all copies of it, including all attachments, and please let
the sender know it went to the wrong person. Thanks



On Wed, 1 Mar 2023 at 02:15, Sahil Modak via dev 
wrote:

> The number of keys/data in BQ would not be constant and grow with time.
>
> A rough estimate would be around 300k keys with an average size of 5kb per
> key. Both the count of the keys and the size of the key would be feature
> dependent (based on the upstream pipelines) and we won't have control over
> this in the future.
>
> Using big query client would mean we would have to run individual queries
> for each of these 300k keys from the BusinessLogic() dofn which operates in
> a global window KV
>
> Also, the order of the data from BQ would not matter to us since the only
> thing we are trying to solve here is regaining the state spec information
> before starting to consume pub/sub.
>
> I will explore using Wait.on(bigquery) before pub/sub read since I am not
> sure if side input would be the best option here.
>
>
> On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles  wrote:
>
>> I'm also curious how much you depend on order to get the state contents
>> right. The ordering of the side input will be arbitrary, and even the
>> streaming input can have plenty of out of order messages. So I want to
>> think about what are the data dependencies that result in the requirement
>> of order. Or if there are none and you just want to know that all the past
>> data has been processed, Niel's idea is one solution. It isn't parallel,
>> though.
>>
>> Kenn
>>
>> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:
>>
>>> How large is this state spec stored in BQ? If the size isn't too large,
>>> you can read it from BQ and make it a side input into the DoFn.
>>>
>>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
>>>> We are trying to re-initialize our state specs in the BusinessLogic()
>>>> DoFn from BQ.
>>>> BQ has data about the state spec, and we would like to make sure that
>>>> the state specs in our BusinessLogic() dofn are initialized before it
>>>> starts consuming the pub/sub.
>>>>
>>>> This is for handling the case of redeployment of the dataflow jobs so
>>>> that the states are preserved and the BusinessLogic() can work seamlessly
>>>> as it was previously. All our dofns are operating in a global window and do
>>>> not perform any aggregation.
>>>>
>>>> We are currently using Redis to preserve the state spec information but
>>>> would like to explore using BQ as an alternative to Redis.
>>>>
>>>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
>>>> wrote:
>>>>
>>>>> My suggestion is to try to solve the problem in terms of what you want
>>>>> to compute. Instead of trying to control the operational aspects like 
>>&g

Re: Consuming one PCollection before consuming another with Beam

2023-02-27 Thread Niel Markwick via dev
Why not pass the BQ data as.a side input to your transform?

Side inputs are read fully and materialised before the transform starts.

This will allow your transform to initialize its state before processing
any elements from the PubSub input.

On Mon, 27 Feb 2023, 20:43 Daniel Collins via dev, 
wrote:

> It sounds like what you're doing here might be best done outside the beam
> model. Instead of performing the initial computation reading from BQ into a
> PCollection, perform it using the BigQuery client library in the same
> manner as you currently do to load the data from redis.
>
> On Mon, Feb 27, 2023 at 2:07 PM Sahil Modak via dev 
> wrote:
>
>> We are trying to re-initialize our state specs in the BusinessLogic()
>> DoFn from BQ.
>> BQ has data about the state spec, and we would like to make sure that the
>> state specs in our BusinessLogic() dofn are initialized before it starts
>> consuming the pub/sub.
>>
>> This is for handling the case of redeployment of the dataflow jobs so
>> that the states are preserved and the BusinessLogic() can work seamlessly
>> as it was previously. All our dofns are operating in a global window and do
>> not perform any aggregation.
>>
>> We are currently using Redis to preserve the state spec information but
>> would like to explore using BQ as an alternative to Redis.
>>
>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles  wrote:
>>
>>> My suggestion is to try to solve the problem in terms of what you want
>>> to compute. Instead of trying to control the operational aspects like "read
>>> all the BQ before reading Pubsub" there is presumably some reason that the
>>> BQ data naturally "comes first", for example if its timestamps are earlier
>>> or if there is a join or an aggregation that must include it. Whenever you
>>> think you want to set up an operational dependency between two things that
>>> "happen" in a pipeline, it is often best to pivot your thinking to the data
>>> and what you are trying to compute, and the built-in dependencies will
>>> solve the ordering problems.
>>>
>>> So - is there a way to describe your problem in terms of the data and
>>> what you are trying to compute?
>>>
>>> Kenn
>>>
>>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
>>> wrote:
>>>
 First PCollections are completely unordered, so there is no guarantee
 on what order you'll see events in the flattened PCollection.

 There may be ways to process the BigQuery data in a separate transform
 first, but it depends on the structure of the data. How large is the
 BigQuery table? Are you doing any windowed aggregations here?

 Reuven

 On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
 smo...@paloaltonetworks.com> wrote:

> Yes, this is a streaming pipeline.
>
> Some more details about existing implementation v/s what we want to
> achieve.
>
> Current implementation:
> Reading from pub-sub:
>
> Pipeline input = Pipeline.create(options);
>
> PCollection pubsubStream = input.apply("Read From Pubsub", 
> PubsubIO.readMessagesWithAttributesAndMessageId()
>
> .fromSubscription(inputSubscriptionId))
>
>
> Reading from bigquery:
>
> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>
> .apply("JSon Transform", AsJsons.of(TableRow.class));
>
>
> Merge the inputs:
>
> PCollection mergedInput = 
> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
> Flatten.pCollections());
>
>
>
> Business Logic:
>
> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>
>
>
> Above logic is what we use currently in our pipeline.
>
> We want to make sure that we read from BigQuery first & pass the bqStream 
> through our BusinessLogic() before we start consuming pubsubStream.
>
> Is there a way to achieve this?
>
>
> Thanks,
>
> Sahil
>
>
> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>
>> Can you explain this use case some more? Is this a streaming
>> pipeline? If so, how are you reading from BigQuery?
>>
>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi,
>>>
>>> We have a requirement wherein we are consuming input from pub/sub
>>> (PubSubIO) as well as BQ (BQIO)
>>>
>>> We want to make sure that we consume the BQ stream first before we
>>> start consuming the data from pub-sub. Is there a way to achieve this? 
>>> Can
>>> you please help with some code samples?
>>>
>>> Currently, we read data from big query using BigQueryIO into a
>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>> flatten transform in this manner.
>>>
>>> 

Adding transactional writer to SpannerIO

2020-10-13 Thread Niel Markwick
Hey Beam-dev...

I recently had an interaction with a customer that wanted to run a
read-update-write transform on a Cloud Spanner DB inside a streaming Beam
pipeline. I suggested writing their own DoFn, and pointed them at some of
the various pitfalls they need to avoid - (those at least that have been
found and fixed in the Beam SpannerIO.Write transform!)

This is not the first time I have had this request, and I was thinking
about how to introduce a generic transactional RW Spanner writer: The user
would supply a serializable function that takes the input element and
performs the read-update-write, while the transform wraps this function in
the code required to handle the Spanner connection and transform,
potentially adding batching -- running multiple transactions at once.

Would this be something that the community could find useful? Should I
productionize the PoC I have and submit a PR?

In one sense it is against the 'repeatable
<https://beam.apache.org/documentation/programming-guide/#user-code-idempotence>'
recommendation of a DoFn (for example, a transaction that increments a DB
counter would not be idempotent), but in another sense, it makes certain
actions more reliable (eg processing bank account transfers).

All opinions welcome.

-- 
<https://cloud.google.com>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect <https://cloud.google.com/docs/tutorials>
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you have received this communication by mistake, please don't forward it
to anyone else (it may contain confidential or privileged information),
please erase all copies of it, including all attachments, and please let
the sender know it went to the wrong person. Thanks


Re: Could someone review my pull request 12695 ?

2020-09-03 Thread Niel Markwick
I have a bug/PR open on the spanner client libraries, but replacing
ImmutableList with List would be a breaking API change for them compared to
an already-released version, and so will be unlikely to be fixed soon

On Wed, 2 Sep 2020, 01:09 terry xian,  wrote:

>
> As nielm pointed out in his comment of my PR, I added this because
>
> "spanner API change that exposes Guava classes is:
> googleapis/java-spanner/pull/81
> ,
>
> Specifically, adding AsyncResultSet in
> googleapis/java-spanner/pull/81/files#diff-7a9cb34faeb259be46b44f1878b7210f
>
> 
> which returns an ImmutableList."
>
>
> Without this addition, the ApiSurface test would fail,  please see: 
> beam_PreCommit_Java_Commit
> #13264 test - testGcpApiSurface [Jenkins]
> .
> So I was suggested to add new exposed class explicitly.
>
> beam_PreCommit_Java_Commit #13264 test - testGcpApiSurface [Jenkins]
>
>
> 
>
>
> Thanks!
>
>
>
>
> On Tuesday, September 1, 2020, 03:48:37 PM PDT, Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>
> BTW this PR adds the following to the API surface.
>
> (com.google.common.collect.ImmutableCollection.class),
> (com.google.common.collect.ImmutableCollection.Builder.class),
> (com.google.common.collect.ImmutableList.class),
> (com.google.common.collect.ImmutableList.Builder.class),
> (com.google.common.collect.UnmodifiableIterator.class),
> (com.google.common.collect.UnmodifiableListIterator.class),
>
> Any objections to this ?
>
> Terry, could you explain the reason for adding this.
>
> Thanks,
> Cham
>
> On Tue, Sep 1, 2020 at 2:40 PM Chamikara Jayalath 
> wrote:
>
> LGTM. We can merge when tests pass.
>
> Thanks,
> Cham
>
> On Tue, Sep 1, 2020 at 1:32 PM terry xian  wrote:
>
> Hi,
>
> My pull request [BEAM-8758] Google-cloud-spanner upgrade to 1.59.0 and
> google_cloud_bigtable_client_core to 1.16.0 by terryxian78 · Pull Request
> #12695 · apache/beam   was
> there for more than 3 days. Although I've added a reviewer (lukecwik
> ), I am afraid that I missed something which
> might cause the PR not noticed (it is my first PR in Beam). I've asked some
> folks which work on spanner change review my change but need committee
> member for approval.
>
> Could someone in committee review my PR?
>
> Thanks!
>
>
> [BEAM-8758] Google-cloud-spanner upgrade to 1.59.0 and google_cloud_bigt...
>
> Fixes https://issues.apache.org/jira/browse/BEAM-8758 R: @lukecwik CC:
> @suztomo The changes are: The main purpo...
> 
>
>
>
>
>


Re: Apache Beam ZeroMQ connector

2020-06-25 Thread Niel Markwick
I had a customer recently ask me for something similar...

My response was that there is nothing in Beam directly, but an existing
message queue source (such as rabbitmq) could be copied and adapted to use
ZeroMQ. The actual amount of code that would need to be modified is very
little: the majority of the code is for configuration of the connector.

A non-serious over-engineered alternative would be ZMQ->Kafka->Beam...



On Wed, 24 Jun 2020, 22:50 Luke Cwik,  wrote:

> I'm not aware of any ZeroMQ connector implementations that are part of
> Apache Beam.
>
> On Wed, Jun 24, 2020 at 11:44 AM Sherif A. Kozman <
> sherif.koz...@extremesolution.com> wrote:
>
>> Hello,
>>
>> We were in the process of planning a deployment of exporting stream data
>> from Aruba Networks Analytics engine through Apache beam and it turns out
>> that it utilizes ZeroMQ for messaging.
>> We couldn't find any ZeroMQ connectors and were wondering if it does
>> exist or it would be compatible with other connectors for Apache Beam.
>>
>> Thanks
>> Sherif Kozman
>>
>> 
>>
>> Sherif Kozman
>>
>> *Extreme Solution*
>>
>> A: 17875 Von Karman Ave suite 150, Irvine, CA, 92614
>>
>> P: +1 714 719 2237 <+1-714-719-2237>M: +20 122 2426241
>> <+20-122-2426241>
>>
>> E: she...@extremesolution.comW: www.extremesolution.com
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>>
>> This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. If you have received this email in error please notify the
>> system manager. This message contains confidential information and is
>> intended only for the individual named. If you are not the named addressee
>> you should not disseminate, distribute or copy this e-mail. Please notify
>> the sender immediately by e-mail if you have received this e-mail by
>> mistake and delete this e-mail from your system. If you are not the
>> intended recipient you are notified that disclosing, copying, distributing
>> or taking any action in reliance on the contents of this information is
>> strictly prohibited.
>>
>> ↪ Schedule A Meeting With Me!
>> 
>>
>> [image:
>> https://s3.amazonaws.com/accredible_api_mails/embed_badge/12225976.png]
>> 
>>
>>
>>


Request for Java PR review

2020-06-23 Thread Niel Markwick
Hey devs...

I have 3 PRs sitting waiting for a code review to fix potential bugs (and
improve memory use) in SpannerIO. 2 small, and one quite large -- I would
really like these to be in 2.23...

https://github.com/apache/beam/pulls/nielm

Would someone be willing to have a look?

Thanks!

-- 
<https://cloud.google.com>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect <https://cloud.google.com/docs/tutorials>
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you have received this communication by mistake, please don't forward it
to anyone else (it may contain confidential or privileged information),
please erase all copies of it, including all attachments, and please let
the sender know it went to the wrong person. Thanks


How to submit PRs for dependant changes?

2020-04-26 Thread Niel Markwick
Hey Beam devs...

I have 4 changes to submit as PRs to fix 4 independent issues in the
io.gcp.SpannerIO class.

The PRs are notionally independent, but will cause merge conflicts if
submitted separately, as the fix for each issue will change code related to
the fix for some of the others.

How do you prefer the PRs to be submitted?

a) one single PR with 4 sequential commits within it
b) one single PR with all changes squashed.
c) 4 separate conflicting PRs which will have to be merged separately, and
a merge conflict resolution after each one.

a) is how it is in my repo.
b) would be easy, but less clear what the changes were for.
c) I guess would be clearest in the Beam changelog.

If the answer is a) or b), how would I specify multiple JIRA tickets in the
PR title?

Thanks!

-- 
<https://cloud.google.com>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect <https://cloud.google.com/docs/tutorials>
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you have received this communication by mistake, please don't forward it
to anyone else (it may contain confidential or privileged information),
please erase all copies of it, including all attachments, and please let
the sender know it went to the wrong person. Thanks


Re: pipeline steps

2019-02-10 Thread Niel Markwick
This would have to flow through to the other IO wrappers as well, perhaps
outputting a KV

I recently wrote an AvroIO parseAllGenericRecord() equivalent transform,
because I was reading files of various schemas and needed the the parseFn
to know both the filename currently being read and use some side-input...

It ended up being quite complex - especially as I wanted to shard the file
read, like AvroIO already does - and I basically re-implemented part of
AvroIO for my use-case...

@Chaim, one simpler option could be to use parseGenericRecord and use the
*name* of the Avro schema in the GenericRecord as a way to determine the
table name - this may mean that you have to change the way your Avro files
are being written..




On Sun, 10 Feb 2019, 07:03 Reuven Lax,  wrote:

> I think we could definitely add an option to FileIO to add the filename to
> every record. It would come at a (performance) cost - often the filename is
> much larger than the actual record..
>
> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  wrote:
>
>> This comes up a lot, wanting file names alongside the data that came from
>> the file. It is a historical quirk that none of our connectors used to have
>> the file names. What is the change needed for FileIO + parse Avro to be
>> really easy to use?
>>
>> Kenn
>>
>> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:
>>
>>> I haven't needed to do this with Beam before, but I've definitely had
>>> similar needs in the past. Spark, for example, provides an input_file_name
>>> function that can be applied to a dataframe to add the input file as an
>>> additional column. It's not clear to me how that's implemented, though.
>>>
>>> Perhaps others have suggestions, but I'm not aware of a way to do this
>>> conveniently in Beam today. To my knowledge, today you would have to use
>>> FileIO.match() and FileIO.readMatches() to get a collection of
>>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
>>> and the bytes of the file, and you'd be responsible for parsing those bytes
>>> into avro records. You'd  be able to output something like a KV
>>> that groups the file name together with the parsed avro record.
>>>
>>> Seems like something worth providing better support for in Beam itself
>>> if this indeed doesn't already exist.
>>>
>>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:
>>>
 Hi,
   I am working on a pipeline that listens to a topic on pubsub to get
 files that have changes in the storage. Then i read avro files, and
 would like to write them to bigquery based on the file name (to
 different tables).
   My problem is that the transformer that reads the avro does not give
 me back the files name (like a tuple or something like that). I seem
 to have this pattern come back a lot.
 Can you think of any solutions?

 Chaim

 --


 Loans are funded by
 FinWise Bank, a Utah-chartered bank located in Sandy,
 Utah, member FDIC, Equal
 Opportunity Lender. Merchant Cash Advances are
 made by Behalf. For more
 information on ECOA, click here
 . For important information about
 opening a new
 account, review Patriot Act procedures here
 .
 Visit Legal
  to
 review our comprehensive program terms,
 conditions, and disclosures.

>>>


Re: AvroIO read from unknown schema to generic record.

2019-01-13 Thread Niel Markwick
This is my point though : AvroIO.parseAllGenericRecords() is able to decode
the object from the avro file into a GenericRecord  _without_ knowing the
schema in advance as it uses the writer schema embedded in the file.

So can there be a GenericRecordAvroCoder which uses the schema embedded in
the GenericRecord to encode itself?

On Sun, 13 Jan 2019, 16:59 Reuven Lax,  wrote:

> AvroCoder needs to know the schema of the object in order to decode the
> object. Remember that in Beam the set of PCollections in a graph is static,
> so all the coders need to be known up front. To make things work with
> parseAllGenericRecords I think you would either need to embed the schema in
> every single record (which would be expensive), or you would need to create
> a new union type to represent the possible types (assuming that you know
> the possible schemas ahead of time).
>
> On Sat, Jan 12, 2019 at 12:09 PM Niel Markwick  wrote:
>
>> Considering the transform is reading Avro container files, which by
>> definition
>> <https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files>
>> contain a schema, it should be possible for the reader to infer the schema
>> from the file...
>>
>> parseAllGenericRecords()
>> <https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/io/AvroIO.html#parseAllGenericRecords-org.apache.beam.sdk.transforms.SerializableFunction->
>>  seems
>> to be able to do this, decodes and passes a GenericRecord to the
>> given parseFn without needing to know the schema in advance...
>>
>> In fact parseAllGenericRecords() would be perfect for my requirements if
>> I could use a Contextful.Fn as a parseFn that accepted side imputs :/
>>
>>
>>
>> <https://cloud.google.com>
>> * •  **Niel Markwick*
>> * •  *Cloud Solutions Architect
>> * •  *Google Belgium
>> * •  *ni...@google.com
>> * •  *+32 2 894 6771
>>
>> Google Belgium NV/SA, Steenweg op Etterbeek 180
>> <https://maps.google.com/?q=Steenweg%C2%A0op%C2%A0Etterbeek%C2%A0180=gmail=g>
>> , 1040 Brussel, Belgie. RPR: 0878.065.378
>>
>> If you received this communication by mistake, please don't forward it to
>> anyone else (it may contain confidential or privileged information), please
>> erase all copies of it, including all attachments, and please let the
>> sender know it went to the wrong person. Thanks
>>
>>
>> On Sat, 12 Jan 2019 at 20:08, Alex Van Boxel  wrote:
>>
>>> Hey Niels,
>>>
>>> The reason you need to specify the schema to GenericRecord is that
>>> without it it's *impossible* for GenericRecord to make any sense of the
>>> binary data. Unlike protobuf, avro doesn't have any kind of information in
>>> the message about the structure. This makes it smaller, but impossible to
>>> decode without the schema.
>>>
>>> So if you really want todo flexible messages, I would read it binary,
>>> message per message and handle your schema switching into a DoFn.
>>>
>>>  _/
>>> _/ Alex Van Boxel
>>>
>>>
>>> On Sat, Jan 12, 2019 at 7:44 PM Niel Markwick  wrote:
>>>
>>>> Is there a reason why don't we have an AvroIO reader that reads and
>>>> outputs a GenericRecord without requiring any schema to be given?
>>>>
>>>> Does passing the schema into readGenericRecord() have any benefits
>>>> other than verifying that the avro file has records of the same schema?
>>>>
>>>> This could be useful for parsing a collection of avro files of varying
>>>> schemas, then post-processing the GenericRecords in further transform with
>>>> side inputs.
>>>>
>>>> --
>>>>
>>>> <https://cloud.google.com/>
>>>> * •  **Niel Markwick*
>>>> * •  *Cloud Solutions Architect
>>>> * •  *Google Belgium
>>>> * •  *ni...@google.com
>>>> * •  *+32 2 894 6771 <+3228946771>
>>>>
>>>> Google Belgium NV/SA, Steenweg op Etterbeek 180
>>>> <https://maps.google.com/?q=Steenweg%C2%A0op%C2%A0Etterbeek%C2%A0180=gmail=g>
>>>> , 1040 Brussel, Belgie. RPR: 0878.065.378
>>>>
>>>> If you received this communication by mistake, please don't forward it
>>>> to anyone else (it may contain confidential or privileged information),
>>>> please erase all copies of it, including all attachments, and please let
>>>> the sender know it went to the wrong person. Thanks
>>>>
>>> --

<https://cloud.google.com/>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771 <+3228946771>

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you received this communication by mistake, please don't forward it to
anyone else (it may contain confidential or privileged information), please
erase all copies of it, including all attachments, and please let the
sender know it went to the wrong person. Thanks


Re: AvroIO read from unknown schema to generic record.

2019-01-13 Thread Niel Markwick
Thanks.

So AvroIO.parseAllGenericRecords() uses the writer schema to read the avro
file and creates a GenericRecord using this same writer schema. This is
then given to the parseFn SerializableFunction to convert into some other
object - no other schemas are supplied.

What I am suggesting is there would be a AvroIO.readAllGenericRecords()
which similarly uses the writer schema create a GenericRecord as its output
allowing the pipeline to process it further.



On Sun, 13 Jan 2019, 04:58 Gijun Lee,  wrote:

> The schema attached to an avro file is "writer schema" which means that
> this schema is required to encode/decode avro records.  This schema is very
> critical because the records in the avro file cannot be converted to
> Generic Record objects or even specific records without this writer
> schema.  The schema that is being passed as an argument to the API is not
> "writer" schema.  This schema is "reader" schema.  This schema is the same
> as the writer schema in most of the time, but this reader schema can be an
> "evolved" schema from the writer schema. If you look at AVRO APIs,  the
> APIs uses the two schemas internally.
>
> Hope this helps.
>
>
>
>
> On Sat, Jan 12, 2019 at 3:09 PM Niel Markwick  wrote:
>
>> Considering the transform is reading Avro container files, which by
>> definition
>> <https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files>
>> contain a schema, it should be possible for the reader to infer the schema
>> from the file...
>>
>> parseAllGenericRecords()
>> <https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/io/AvroIO.html#parseAllGenericRecords-org.apache.beam.sdk.transforms.SerializableFunction->
>>  seems
>> to be able to do this, decodes and passes a GenericRecord to the
>> given parseFn without needing to know the schema in advance...
>>
>> In fact parseAllGenericRecords() would be perfect for my requirements if
>> I could use a Contextful.Fn as a parseFn that accepted side imputs :/
>>
>>
>>
>> <https://cloud.google.com>
>> * •  **Niel Markwick*
>> * •  *Cloud Solutions Architect
>> * •  *Google Belgium
>> * •  *ni...@google.com
>> * •  *+32 2 894 6771
>>
>> Google Belgium NV/SA, Steenweg op Etterbeek 180
>> <https://maps.google.com/?q=Steenweg%C2%A0op%C2%A0Etterbeek%C2%A0180=gmail=g>
>> , 1040 Brussel, Belgie. RPR: 0878.065.378
>>
>> If you received this communication by mistake, please don't forward it to
>> anyone else (it may contain confidential or privileged information), please
>> erase all copies of it, including all attachments, and please let the
>> sender know it went to the wrong person. Thanks
>>
>>
>> On Sat, 12 Jan 2019 at 20:08, Alex Van Boxel  wrote:
>>
>>> Hey Niels,
>>>
>>> The reason you need to specify the schema to GenericRecord is that
>>> without it it's *impossible* for GenericRecord to make any sense of the
>>> binary data. Unlike protobuf, avro doesn't have any kind of information in
>>> the message about the structure. This makes it smaller, but impossible to
>>> decode without the schema.
>>>
>>> So if you really want todo flexible messages, I would read it binary,
>>> message per message and handle your schema switching into a DoFn.
>>>
>>>  _/
>>> _/ Alex Van Boxel
>>>
>>>
>>> On Sat, Jan 12, 2019 at 7:44 PM Niel Markwick  wrote:
>>>
>>>> Is there a reason why don't we have an AvroIO reader that reads and
>>>> outputs a GenericRecord without requiring any schema to be given?
>>>>
>>>> Does passing the schema into readGenericRecord() have any benefits
>>>> other than verifying that the avro file has records of the same schema?
>>>>
>>>> This could be useful for parsing a collection of avro files of varying
>>>> schemas, then post-processing the GenericRecords in further transform with
>>>> side inputs.
>>>>
>>>> --
>>>>
>>>> <https://cloud.google.com/>
>>>> * •  **Niel Markwick*
>>>> * •  *Cloud Solutions Architect
>>>> * •  *Google Belgium
>>>> * •  *ni...@google.com
>>>> * •  *+32 2 894 6771 <+3228946771>
>>>>
>>>> Google Belgium NV/SA, Steenweg op Etterbeek 180
>>>> <https://maps.google.com/?q=Steenweg%C2%A0op%C2%A0Etterbeek%C2%A0180=gmail=g>
>>>> , 1040 Brussel, Belgie. RPR: 0878.065.378
>>>>
>>>> If you received this communication by mistake, please don't forward it
>>>> to anyone else (it may contain confidential or privileged information),
>>>> please erase all copies of it, including all attachments, and please let
>>>> the sender know it went to the wrong person. Thanks
>>>>
>>> --

<https://cloud.google.com/>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771 <+3228946771>

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you received this communication by mistake, please don't forward it to
anyone else (it may contain confidential or privileged information), please
erase all copies of it, including all attachments, and please let the
sender know it went to the wrong person. Thanks


Re: AvroIO read from unknown schema to generic record.

2019-01-12 Thread Niel Markwick
Considering the transform is reading Avro container files, which by
definition
<https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files>
contain a schema, it should be possible for the reader to infer the schema
from the file...

parseAllGenericRecords()
<https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/io/AvroIO.html#parseAllGenericRecords-org.apache.beam.sdk.transforms.SerializableFunction->
seems
to be able to do this, decodes and passes a GenericRecord to the
given parseFn without needing to know the schema in advance...

In fact parseAllGenericRecords() would be perfect for my requirements if I
could use a Contextful.Fn as a parseFn that accepted side imputs :/



<https://cloud.google.com>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you received this communication by mistake, please don't forward it to
anyone else (it may contain confidential or privileged information), please
erase all copies of it, including all attachments, and please let the
sender know it went to the wrong person. Thanks


On Sat, 12 Jan 2019 at 20:08, Alex Van Boxel  wrote:

> Hey Niels,
>
> The reason you need to specify the schema to GenericRecord is that without
> it it's *impossible* for GenericRecord to make any sense of the binary
> data. Unlike protobuf, avro doesn't have any kind of information in the
> message about the structure. This makes it smaller, but impossible to
> decode without the schema.
>
> So if you really want todo flexible messages, I would read it binary,
> message per message and handle your schema switching into a DoFn.
>
>  _/
> _/ Alex Van Boxel
>
>
> On Sat, Jan 12, 2019 at 7:44 PM Niel Markwick  wrote:
>
>> Is there a reason why don't we have an AvroIO reader that reads and
>> outputs a GenericRecord without requiring any schema to be given?
>>
>> Does passing the schema into readGenericRecord() have any benefits other
>> than verifying that the avro file has records of the same schema?
>>
>> This could be useful for parsing a collection of avro files of varying
>> schemas, then post-processing the GenericRecords in further transform with
>> side inputs.
>>
>> --
>>
>> <https://cloud.google.com/>
>> * •  **Niel Markwick*
>> * •  *Cloud Solutions Architect
>> * •  *Google Belgium
>> * •  *ni...@google.com
>> * •  *+32 2 894 6771 <+3228946771>
>>
>>
>> Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie. RPR: 
>> 0878.065.378
>>
>> If you received this communication by mistake, please don't forward it to
>> anyone else (it may contain confidential or privileged information), please
>> erase all copies of it, including all attachments, and please let the
>> sender know it went to the wrong person. Thanks
>>
>


AvroIO read from unknown schema to generic record.

2019-01-12 Thread Niel Markwick
Is there a reason why don't we have an AvroIO reader that reads and outputs
a GenericRecord without requiring any schema to be given?

Does passing the schema into readGenericRecord() have any benefits other
than verifying that the avro file has records of the same schema?

This could be useful for parsing a collection of avro files of varying
schemas, then post-processing the GenericRecords in further transform with
side inputs.

-- 

<https://cloud.google.com/>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771 <+3228946771>

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you received this communication by mistake, please don't forward it to
anyone else (it may contain confidential or privileged information), please
erase all copies of it, including all attachments, and please let the
sender know it went to the wrong person. Thanks


regression in 2.9.0 - FileIO.write with dynamic naming and DirectRunner.

2019-01-11 Thread Niel Markwick
I have found and narrowed down a regression in 2.9.0 (and 2.10.0/head)
where:

   - If you use DirectRunner (or TestPipeline which uses DirectRunner)
   - AND you use FileIO.writeDynamic()
   - AND you have a side input to the ContextFul.FN
   - AND you do not limit to a single shard
   - Then the pipeline will fail.


java.lang.IllegalStateException: All PCollectionViews that are consumed
must be written by some WriteView PTransform: Missing [
[RunnerPCollectionView]]

This is due to the DirectRunner using TransformOverrides re-writing FileIO
sinks to use runner-determined-sharding
( see DirectRunner.java line 226
<https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java#L226>
)

No idea why this occurs or why it started failing in 2.9.0...

Raised https://issues.apache.org/jira/browse/BEAM-6407



<https://cloud.google.com>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect
* •  *Google Belgium
* •  *ni...@google.com


Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you received this communication by mistake, please don't forward it to
anyone else (it may contain confidential or privileged information), please
erase all copies of it, including all attachments, and please let the
sender know it went to the wrong person. Thanks


Re: [PROPOSAL] Move sorting to sdks-java-core

2018-10-18 Thread Niel Markwick
FYI: the BufferedExternalSorter depends on Hadoop client libraries
(specifically hadoop_mapreduce_client_core and hadoop_common), but not on
the Hadoop service -- because the  ExternalSorter

uses Hadoop's SequenceFile

for
on-disk sorting.



On Thu, 18 Oct 2018 at 11:19 David Morávek  wrote:

> Kenn, I believe we should not introduce hadoop dependency to neither sdks
> or runners. We may split sorting in two packages, one with the
> transformation + in memory implementation (this is the part I'd love to see
> become part of sdks-java-core) and second module with more robust external
> sorter (with hadoop dep).
>
> Does this make sense?
>
>
> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin  wrote:
>
>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles  wrote:
>>
>>> The runner can always just depend on the sorter to do it the legacy way
>>> by class matching; it shouldn't incur other dependency penalties... but now
>>> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
>>> price to pay for a user in any event. Are those Hadoop deps reasonably
>>> self-contained?
>>>
>>
>> Nice catch, Kenn! This is indeed why we didn't originally include the
>> Sorter in core. The Hadoop deps have an enormous surface, or did at the
>> time.
>>
>> Dan
>>
>>
>>>
>>> Kenn
>>>
>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik  wrote:
>>>
 Merging the sorter into sdks-java-core isn't needed for pipelines
 executed via portability since the Runner will be able to perform
 PTransform replacement and optimization based upon the URN of the transform
 and its payload so it would never need to have the "Sorter" class in its
 classpath.

 I'm ambivalent about whether merging it now is worth it.

 On Wed, Oct 17, 2018 at 2:31 PM David Morávek 
 wrote:

> We can always fall back to the External sorter in case of merging
> windows. I reckon in this case, values usually fit in memory, so it would
> not be an issue.
>
> In case of non-merging windows, runner implementation would probably
> require to group elements also by window during shuffle.
>
> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax  wrote:
>
>> One concern would be merging windows. This happens after shuffle, so
>> even if the shuffle were sorted you would need to do a sorted merge of 
>> two
>> sorted buffers.
>>
>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>> david.mora...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I want to summarize my thoughts on the per key value sorting.
>>>
>>> Currently we have a separate module for sorting extension. The
>>> extension contains *SortValues* transformation and implementations
>>> of different sorters.
>>>
>>> Performance-wise it would be great to be able* to delegate sorting
>>> to a runner* if it supports sort based shuffle. In order to do so,
>>> we should *move SortValues transformation to sdks-java-core*, so a
>>> runner can easily provide its own implementation.
>>>
>>> The robust implementation is needed mainly for building of HFiles
>>> for the HBase bulk load. When using external sorter, we often sort the
>>> whole data set twice (shuffle may already did a job).
>>>
>>> SortValues can not use custom comparator, because we want to be able
>>> to push sorting logic down to a byte based shuffle.
>>>
>>> The usage of SortValues transformation is little bit confusing. I
>>> think we should add a *SortValues.perKey* method, which accepts a
>>> secondary key extractor and coder, as the usage would be easier to
>>> understand. Also, this explicitly states, that we sort values
>>> *perKey* only and that we sort using an *encoded secondary key*.
>>> Example usage:
>>>
>>>
>>> *PCollection> input = ...;*
>>> *input.apply(SortValues.perKey(KV::getValue,
>>> BigEndianLongCoder.of()))*
>>>
>>> What do you think? Is this the right direction?
>>>
>>> Thanks for the comments!
>>>
>>> Links:
>>> -
>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>
>>


Re: Please add me as a Jira contributor

2018-10-04 Thread Niel Markwick
yes - that slack link (https://s.apache.org/slack-invite) goes to a Slack
page saying:
*This invite link is no longer active*

(and thanks!)

On Thu, 4 Oct 2018 at 22:40 Kenneth Knowles  wrote:

> Added you to JIRA role.
>
> Are you referring to the link(s) on
> https://beam.apache.org/community/contact-us/
>
> Kenn
>
> On Thu, Oct 4, 2018 at 12:49 PM Niel Markwick  wrote:
>
>> Please can you add me as a contributor in the Beam issue tracker.. so I
>> can assign some issues to myself :)
>>
>> I would also like to mention that the link to join the ASF slack channel
>> does not work...
>> --
>>
>> <https://cloud.google.com/>
>> * •  **Niel Markwick*
>> * •  *Cloud Solutions Architect
>> * •  *Google Belgium
>> * •  *ni...@google.com
>> * •  *+32 2 894 6771 <+3228946771>
>>
>> Google Belgium NV/SA, Steenweg op Etterbeek 180
>> <https://maps.google.com/?q=Steenweg%C2%A0op%C2%A0Etterbeek%C2%A0180=gmail=g>
>> , 1040 Brussel, Belgie. RPR: 0878.065.378
>>
>> If you received this communication by mistake, please don't forward it to
>> anyone else (it may contain confidential or privileged information), please
>> erase all copies of it, including all attachments, and please let the
>> sender know it went to the wrong person. Thanks
>>
> --

<https://cloud.google.com/>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771 <+3228946771>

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you received this communication by mistake, please don't forward it to
anyone else (it may contain confidential or privileged information), please
erase all copies of it, including all attachments, and please let the
sender know it went to the wrong person. Thanks


Please add me as a Jira contributor

2018-10-04 Thread Niel Markwick
Please can you add me as a contributor in the Beam issue tracker.. so I can
assign some issues to myself :)

I would also like to mention that the link to join the ASF slack channel
does not work...
-- 

<https://cloud.google.com/>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771 <+3228946771>

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you received this communication by mistake, please don't forward it to
anyone else (it may contain confidential or privileged information), please
erase all copies of it, including all attachments, and please let the
sender know it went to the wrong person. Thanks