Use shared invites for Beam Slack Channel

2018-03-13 Thread Dan Halperin
Slack now has a publicly-endorsed way to post a public shared invite:
https://my.slack.com/admin/shared_invites

The downside is that it apparently has to be renewed monthly.

Maybe Beam should use that, instead? Tradeoffs are not obvious, but it
seems a win:

* forget to renew -> people can't sign up (and reach out to user@?)
vs
* now, always force them to mail user@

Dan

On Tue, Mar 13, 2018 at 11:54 AM, Lukasz Cwik  wrote:

> Invite sent, welcome.
>
> On Tue, Mar 13, 2018 at 11:08 AM, Ramjee Ganti 
> wrote:
>
>> Hello,
>>
>> I am using Apache Beam and Dataflow for the last few months and Can
>> someone please add me to the Beam slack channel?
>>
>> Thanks
>> Ramjee
>> http://ramjeeganti.com
>>
>
>


Re: "Radically modular data ingestion APIs in Apache Beam" @ Strata - slides available

2018-03-08 Thread Dan Halperin
Looks like it was a good talk! Why is it Google Confidential & Proprietary,
though?

Dan

On Thu, Mar 8, 2018 at 11:49 AM, Eugene Kirpichov 
wrote:

> Hey all,
>
> The slides for my yesterday's talk at Strata San Jose https://conferences.
> oreilly.com/strata/strata-ca/public/schedule/detail/63696 have been
> posted on the talk page. They may be of interest both to users and IO
> authors.
>
> Thanks.
>


Re: Problem when trying to specify the runner in examples

2017-05-24 Thread Dan Halperin
Hi Claire,

Have you seen the Quickstarts on https://beam.apache.org? Specifically, we
hope that https://beam.apache.org/get-started/quickstart-java/ answers
these questions and has explicit instructions for running WordCount on all
runners.

Dan

On Wed, May 24, 2017 at 2:45 PM, Thomas Groh  wrote:

> You should specify the Flink profile when you execute, with
> '-Pflink-runner'. That will add the Flink dependency to your classpath when
> you execute the pipeline.
>
> On Wed, May 24, 2017 at 2:35 PM, Claire Yuan 
> wrote:
>
>> Hi all,
>>   When I tried specify the runner in command for the examples:
>> *mvn exec:java -Dexec.mainClass=org.apache.be
>> am.examples.WordCount
>> -Dexec.args="--output=wordcount.txt --runner=FlinkRunner" *
>>
>>   The building was failed and got error message as:
>> *Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.5.0:java
>> (default-cli) on project beam-examples-java: An exception occured while
>> executing the Java class. null: InvocationTargetException: Unknown 'runner'
>> specified 'FlinkRunner', supported pipeline runners [DirectRunner] -> [Help
>> 1]*
>>
>>I am wondering if anyone got the same issue and how you solved it.
>>
>
>


Re: BigQuery table backed by Google Sheet

2017-05-18 Thread Dan Halperin
Hi Prabeesh,

I believe to read from a sheet you have to use a fromQuery("SELECT * FROM
[sheet]"). This is because federated tables are really views and cannot be
exported to GCS directly.

On Thu, May 18, 2017 at 6:18 PM, Chamikara Jayalath 
wrote:

> I haven't tried reading BQ tables backed by Google sheets but this error
> seems to be coming from BigQuery. Could you try contacting BQ support
> channels ?
>
> Thanks,
> Cham
>
> On Thu, May 18, 2017 at 5:26 AM Prabeesh K.  wrote:
>
>> Hi,
>>
>> while I am trying to query the table that is backed by google sheet in
>> beam python sdk getting following error. error(s): errorResult: Using
>> table  is not allowed for this operation because of its type.
>> Try using a different table that is of type TABLE., error: Using table
>>  is not allowed for this operation because of its type. Try
>> using a different table that is of type TABLE.
>>
>> Anybody help to fix this issue.
>>
>> Regards,
>> Parbeesh K.
>>
>> On 5 May 2017 at 13:05, Prabeesh K.  wrote:
>>
>>> Hi Davor,
>>>
>>> Thank for your prompt reply.
>>>
>>> Regards,
>>> Prabeesh K.
>>>
>>> On 5 May 2017 at 05:48, Davor Bonaci  wrote:
>>>
 This sounds like a Dataflow-specific question, so it might be better
 addressed by a StackOverflow question tagged with google-cloud-dataflow 
 [1].

 I'd suggest checking the Dataflow documentation for more information
 about its security model [2]. You'd likely have to use a service account,
 as described there for a few scenarios.

 Hope this helps, but please don't hesitate to post on StackOverflow for
 any Dataflow-specific questions.

 Davor

 [1] http://stackoverflow.com/questions/tagged/google-cloud-dataflow
 [2] https://cloud.google.com/dataflow/security-and-permissions

 On Wed, May 3, 2017 at 11:08 PM, Prabeesh K. 
 wrote:

> Hi Anil,
>
> Thank you for your attempt to answer the question. This is the broad
> answer.
>
> The problem is we should add additional scope to Beam to read google
> sheet and add the permission to the service account to read the google
> sheet.
>
> Regards,
> Prabeesh K.
>
>
> On 4 May 2017 at 08:58, Anil Srinivas 
> wrote:
>
>> Hi,
>> The problem with it as what I see is that you don't have the
>> permissions to access the data in the BigQuery table. Make sure you login
>> into the account which has permissions for reading/writing data in the
>> table.
>>
>>
>>
>>
>> Regards,
>> Anil
>>
>>
>> On Wed, May 3, 2017 at 6:21 PM, Prabeesh K. 
>> wrote:
>>
>>> Hi,
>>>
>>> How to we can read a BigQuery table that backed by google sheet?
>>>
>>> For me, I am getting the following error.
>>>
>>> "error": {
>>>   "errors": [
>>>{
>>> "domain": "global",
>>> "reason": "accessDenied",
>>> "message": "Access Denied: BigQuery BigQuery: Permission denied
>>> while globbing file pattern.",
>>> "locationType": "other",
>>> "location": "/gdrive/id/--"
>>>}
>>>   ],
>>>   "code": 403,
>>>   "message": "Access Denied: BigQuery BigQuery: Permission denied
>>> while globbing file pattern."
>>>  }
>>> }
>>>
>>>
>>> Help to fix this issue.
>>>
>>>
>>> Regards,
>>> Prabeesh K.
>>>
>>>
>>>
>>
>

>>>
>>


Re: ptransform of two views and no data

2017-05-17 Thread Dan Halperin
Hi Antony,

Can you say a little more about your use case? There are many approaches to
making this work – your solution is actually a valid and often-used
technique – but yes you might use one collection as the main input (just
don't create a view), or you might use a CoGroupByKey and no side inputs,
or maybe other things.

Thanks,
Dan

On Wed, May 17, 2017 at 5:24 AM, Antony Mayi  wrote:

> Hi,
>
> what is the best way to do some operation combining two PCollectionView
> instances that's not involving any PCollection processing? So far I've been
> running it on some fake PCollection of single element and passing the views
> as sideinputs:
>
> pipeline.apply(Create.of(fake_element)).apply(ParDo.of(...).withSideInputs(view1,
> view2)))...
>
> but the need for fake pcollection is bit awkward so wondering if it can be
> avoided?
>
> I can see in beam v2.0.0 there is PCollectionView.getPCollection() so I
> could theoretically use one of the views using main input and the other as
> sideinput... but the .getPCollection() is annotated as internal so I guess
> that's not to be used.
>
> thanks,
> antony.
>


Re: How to skip processing when specific exception is thrown?

2017-05-16 Thread Dan Halperin
Hey Josh,

There isn't really generic functionality for this as we don't want to make
"data loss" easy. There are some ongoing designs for specific transforms
(e.g., BEAM-190 for BigQueryIO). One easy thing to do in this case might be
to wrap the code in a try/catch and if you catch an exception then return
some table name like "leftovers".

Dan

On Tue, May 16, 2017 at 8:02 AM, Josh  wrote:

> Hi all,
>
> I am wondering if there is there a way to make Beam skip certain failures
> - for example I am using BigQueryIO to write to a table, where the table
> name is chosen dynamically:
>
>
> ```
>
> .apply(BigQueryIO.write()
>
> .to(new ExtractTableName()))
>
> ```
>
>
> I want to make it so that, if for some reason my ExtractTableName instance
> (which is a SerializableFunction,
> TableDestination>) throws an exception, then the exception is logged and
> the write is skipped.
>
>
> Is it possible to achieve this behaviour without modifying the Beam
> codebase/BigQueryIO retry logic?
>
> At the moment if my function throws an exception, the write is retried
> indefinitely.
>
>
> Thanks,
>
> Josh
>


Re: weird serialization issue on beam 2.0.0rc2

2017-05-11 Thread Dan Halperin
Thanks Anthony -- I was wrong here when I said that PCollectionViews don't
need to be serializable ; your original report is great and your new one is
better.

Great to catch this issue now!

On Thu, May 11, 2017 at 5:16 PM, Antony Mayi  wrote:

> please find another unit test with more realistic usecase. I am compiling
> it using Java 8 and the stacktrace I get from the testGeneric is attached.
>
> thx,
> a.
>
>
> On Friday, 12 May 2017, 1:52, Thomas Groh  wrote:
>
>
> Hey Antony;
>
> I've tried to update your code to compile in Java 7 and removed the
> SerializableUtils Dan mentioned, and I can't seem to reproduce the issue.
> Can you share more about the specific issue you're having? A stack trace
> would be really helpful.
>
> Thanks,
>
> Thomas
>
> On Thu, May 11, 2017 at 4:42 PM, Dan Halperin  wrote:
>
> Hi Anthony,
>
> I'm a little confused by your code snippet:
>
> SerializableUtils.serializeToB yteArray(
> input.apply(ParDo.of(new NonGenericOutput<>())).apply(" passing",
> View.asSingleton())
> );
>
> is serializing a PCollectionView object. Those are not necessarily
> serializable and it's not clear that it makes sense to do this at all.
>
> Can you say more about what you're trying to do?
>
> Thanks,
> Dan
>
> On Thu, May 11, 2017 at 4:21 PM, Antony Mayi  wrote:
>
> I tried the 2.0.0rc2 and I started facing weird serialization issue that's
> not been happening on 0.6.0. I am able to reproduce it using the attached
> unit test - see first serialization attempt which is passing ok and second
> one slightly different (transform output type is generic) that's failing
> just on 2.0.0
>
> Can anyone find a way around it? In my real case I depend on generics
> output types.
>
> thanks,
> antony.
>
>
>
>
>
>


Re: weird serialization issue on beam 2.0.0rc2

2017-05-11 Thread Dan Halperin
Hi Anthony,

I'm a little confused by your code snippet:

SerializableUtils.serializeToByteArray(
input.apply(ParDo.of(new NonGenericOutput<>())).apply("passing",
View.asSingleton())
);

is serializing a PCollectionView object. Those are not necessarily
serializable and it's not clear that it makes sense to do this at all.

Can you say more about what you're trying to do?

Thanks,
Dan

On Thu, May 11, 2017 at 4:21 PM, Antony Mayi  wrote:

> I tried the 2.0.0rc2 and I started facing weird serialization issue that's
> not been happening on 0.6.0. I am able to reproduce it using the attached
> unit test - see first serialization attempt which is passing ok and second
> one slightly different (transform output type is generic) that's failing
> just on 2.0.0
>
> Can anyone find a way around it? In my real case I depend on generics
> output types.
>
> thanks,
> antony.


Re: HadoopInputFormatIO - HBase - Flink runner

2017-05-10 Thread Dan Halperin
Hi Seshadri,

Note that we have a native HBaseIO -- have you tried that one?
https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java

If that does not work for some reason... I do not believe anyone has used
HBase with that connector yet. Give it a whirl and let us know when you run
into issues!

Thanks,
Dan

On Wed, May 10, 2017 at 3:43 PM, Seshadri Raghunathan 
wrote:

> Hi,
>
> I am looking for some sample code / reference on how to read data from
> HBase using  HadoopInputFormatIO for Flink runner.
>
> Something similar to https://github.com/apache/
> beam/blob/master/sdks/java/io/hadoop/jdk1.8-tests/src/test/
> java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/
> HIFIOCassandraIT.java
> but for HBase on Flink runner.
>
> Appreciate any help in this regard !
>
> Thanks,
> Seshadri
>


Re: Testing of Metrics in context of DoFnTester

2017-05-10 Thread Dan Halperin
Hey Michael,

That TestRule sounds like it might be pretty useful generally; would it be
worth contributing back to Beam?

On Wed, May 10, 2017 at 4:12 AM, Michael Luckey  wrote:

> Hi Pablo,
>
> we ended up to follow the path with setup of the global environment.
>
> Therefore we implemented a simple junit TestRule, which is used something
> along the following:
>
> @Rule TestMetrics metrics = new TestMetrics();
>
>  
>
> @Test
>  public void invalids()  {
>  final DoFnTester doFnTester =
> DoFnTester.of(fixture);
>  doFnTester.processElement(input);
>
>  assertThat(metrics,counterValue(fixture.ctr), is(1L));
>  }
>
> Thats pretty close to what we did before and seems to be working very well.
>
> So again, thanks a lot for your help, very much appreciated,
>
> michel
>
> On Wed, May 10, 2017 at 11:01 AM, Michael Luckey 
> wrote:
>
>> Hi Pablo,
>>
>> thx for the pointers. I'll have a look into it and let you know.
>>
>> Regards,
>>
>> michel
>>
>> On Wed, May 10, 2017 at 3:18 AM, Pablo Estrada 
>> wrote:
>>
>>> Hi Michael,
>>> I'm sorry. I see I did not read your first email properly.
>>>
>>> There are a couple places in the core SDK or runner code and tests that
>>> used to use aggregators, and now use metrics. There are two reasonable
>>> options for this:
>>> 1. In [1], the test sets up the metrics global environment by setting
>>> the current container (e.g.  MetricsEnvironment.setCurrentContainer(new
>>> MetricsContainer("anystep"));), and the LateDataFilter uses metrics
>>> normally[2], by creating a counter that relies on the environment set up in
>>> the test.
>>>
>>> 2. If you'd rather not rely on setting up a global environment, you can
>>> use CounterCell, and pass it in to your test. In [3] it's not a test, but a
>>> CounterCell is still created to keep internal statistics, and later its
>>> value is checked [4]. As a note, you may note in [3] that CounterCells are
>>> a bit quirky to create, as we did not intend for external users to be able
>>> to create them.
>>>
>>> Let me know if these suggestions are helpful.
>>> Best
>>> -P.
>>>
>>> [1] - https://github.com/apache/beam/blob/master/runners/core-java
>>> /src/test/java/org/apache/beam/runners/core/LateDataDropping
>>> DoFnRunnerTest.java#L61
>>> [2] - https://github.com/apache/beam/blob/master/runners/core-java
>>> /src/main/java/org/apache/beam/runners/core/LateDataDropping
>>> DoFnRunner.java#L96
>>> [3] - https://github.com/apache/beam/blob/master/runners/spark/s
>>> rc/main/java/org/apache/beam/runners/spark/stateful/SparkGro
>>> upAlsoByWindowViaWindowSet.java#L210
>>> [4] - https://github.com/apache/beam/blob/master/runners/spark/s
>>> rc/main/java/org/apache/beam/runners/spark/stateful/SparkGro
>>> upAlsoByWindowViaWindowSet.java#L326
>>>
>>> On Tue, May 9, 2017 at 4:33 PM Michael Luckey 
>>> wrote:
>>>
 Hi Pablo,

 thanks for your help! We certainly could change our testing code and
 involve execution of a pipeline during tests.

 But currently we are leveraging DoFnTester, i.e. scoping our tests to
 the DoFn only, which means, there is neither a pipeline nor a pipeline
 result involved, which i could call upon.

 It might be a bad idea trying to test counters on this basis, but as it
 was supported previously i thought we might have overlooked an API for
 accessing these metrics somehow within DoFnTesting. Not sure, wether it
 makes sense for the DoFnTester to somehow provide Metrics-Support to enable
 this kind of tests. I certainly do not like the idea to much starting to do
 some mocking of the metrics api within my test implementation.

 Regards,

 michel


 On Wed, May 10, 2017 at 1:10 AM, Pablo Estrada 
 wrote:

> Hi Michael,
> For the Metrics API, the way to programatically query the value of a
> metric is by using the MetricResults.queryMetrics method. You get the
> MetricResults object from the PipelineResult object, and query it like so:
>
> PipelineResult res = p.run();
> MetricQueryResults metricResult = res.metrics().queryMetrics();
>
> The queryMetrics method takes in a MetricsFilter instance.
>
> Not all runners support this operation. For the dataflow runner, PR
> 2896[1] should add it.
>
> Let me know if you need more help with this.
> Best
> -P.
>
> [1] - https://github.com/apache/beam/pull/2896
>
> On Tue, May 9, 2017 at 3:48 PM Michael Luckey 
> wrote:
>
>> Hi,
>>
>> currently we are evaluating a migration from 0.6.0 to current. We
>> encountered the following problem, which we currently not sure, how to 
>> best
>> solve.
>>
>> Say we have a DoFn, which is using Aggregators, e.g.
>>
>> ctr = createAggregator("someCounter", Sum.ofLongs());
>>
>>  We were testing them with DoFn-Tester like
>>
>> final DoFnTest

Re: BigQuery join in Apache beam

2017-05-03 Thread Dan Halperin
Hi Prabeesh,

The underlying Beam primitive you use for Join is CoGroupByKey – this takes
N different collections KV , KV , ... K and produces
one collection KV, Iterable, ..., Iterable]>. This
is a compressed representation of a Join result, in that you can expand it
to a full outer join, you can implement inner join, and you can implement
lots of other join algorithms.

There is also a Join library that does this under the hood:
https://github.com/apache/beam/tree/master/sdks/java/extensions/join-library


Dan

On Wed, May 3, 2017 at 6:30 AM, Prabeesh K.  wrote:

> Hi Dan,
>
> Sorry for the late response.
>
> I agreed with you for the use cases that you mentioned.
>
> Advice me and please share if there is any sample code to join two data
> sets in Beam that are sharing some common keys.
>
> Regards,
> Prabeesh K.
>
> On 6 February 2017 at 10:38, Dan Halperin  wrote:
>
>> Definitely, using BigQuery for what BigQuery is really good at (big scans
>> and cost-based joins) is nearly always a good idea. A strong endorsement of
>> Ankur's answer.
>>
>> Pushing the right amount of work into a database is an art, however --
>> there are some scenarios where you'd rather scan in BQ and join in Beam
>> because the join result is very large and you can better filter it in Beam,
>> or because you need to do some pre-join-filtering based on an external API
>> call (and you don't want to load the results of that API call into
>> BigQuery)...
>>
>> I've only seen a few, rare, cases of the latter.
>>
>> Thanks,
>> Dan
>>
>> On Sun, Feb 5, 2017 at 9:19 PM, Prabeesh K.  wrote:
>>
>>> Hi Ankur,
>>>
>>> Thank you for your response.
>>>
>>> On 5 February 2017 at 23:59, Ankur Chauhan  wrote:
>>>
>>>> I have found doing joins in bigquery using sql is a lot faster and
>>>> easier to iterate upon.
>>>>
>>>>
>>>> Ankur Chauhan
>>>> On Sat, Feb 4, 2017 at 22:05 Prabeesh K.  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Which is the better way to join two tables in apache beam?
>>>>>
>>>>> Regards,
>>>>> Prabeesh K.
>>>>>
>>>>
>>>
>>
>


Re: Using R in Data Flow

2017-04-12 Thread Dan Halperin
Hi Anant,

The blog post about R-on-Dataflow should work for R-on-Beam -- it just
predates Beam; there is no longer any Dataflow Python that isn't based on
Beam :)

What have you tried?

Thanks,
Dan

On Mon, Apr 10, 2017 at 11:18 PM, Anant Bhandarkar <
anant.bhandar...@impactanalytics.co> wrote:

> Hi,
> We are using Big Query for our querying needs.
> We are also looking to use Dataflow with some of the statistical
> libraries. We are using R libraries to build these statistical models.
>
> We are looking to run our data through the statistical models such as ELM
> , GAM, ARIMA etc. We see that python doesn't have all these libraries which
> we get as Cran packages in R.
>
> We have seen this example where there is a possibility to run R on data
> flow.
>
>
> https://medium.com/google-cloud/cloud-dataflow-can-autoscale
> -r-programs-for-massively-parallel-data-processing-492b57bd732d
> https://github.com/gregmcinnes/incubator-beam/blob/python-
> sdk/sdks/python/apache_beam/examples/complete/wordlength/
> wordlength_R/wordlength_R.py
>
> If we are able to use parallelization provided by Dataflow along with R
> libraries this would be a great for us as a team and also the whole Data
> science community which relies on R Packages.
>
> We would need some help from the Beam to achieve this.
>
> I see that it will be a very good use case for the whole of data science
> community that will enable usage of both Python and R on Beam and Dataflow.
>
> Regards,
> Anant
>
>


Re: UserCodeExecption

2017-04-12 Thread Dan Halperin
Hi Anil,

It looks to me like you are not using Apache Beam -- e.g., I
see com.google.cloud.dataflow.sdk.util.UserCodeException which is not a
Beam class. Additionally, the documentation on cloud.google.com is not for
Apache Beam -- for that, see https://beam.apache.org -- e.g., the Beam Java
Quickstart .

Can you please try with Apache Beam 0.6.0 release? And make sure that your
pipeline works in the DirectRunner before trying it on any of the other
runners, including DataflowRunner.

Thanks,
Dan

On Wed, Apr 12, 2017 at 5:07 AM, Anil Srinivas  wrote:

> Hi,
>  Yes I have activated the Dataflow api and it is getting executed in the
> local. The problem is when I try to execute on the Cloud Dataflow service.
>
>
>
> Thanks and Regards
> Anil
>
> On Apr 12, 2017 17:33, "Alexandre Crayssac" 
> wrote:
>
>> Hi,
>>
>> Have you activated the Dataflow API in the GCP console ?
>>
>> Regards,
>>
>> Alexandre
>>
>> On Wed, Apr 12, 2017 at 11:52 AM, Anil Srinivas <
>> anil.b...@impactanalytics.co> wrote:
>>
>>> Hi,
>>> I have been trying to execute the example word-count on the cloud
>>> dataflow service as per the documentation in https://cloud.google.com/da
>>> taflow/docs/quickstarts/quickstart-java-maven. Even after following the
>>> exact same steps as mentioned in the document, I am getting the following
>>> error:
>>>
>>> Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException:
>>> java.lang.RuntimeException: java.lang.RuntimeException:
>>> com.google.cloud.dataflow.sdk.util.UserCodeException:
>>> java.io.IOException: com.google.api.client.googleap
>>> is.json.GoogleJsonResponseException: 400 Bad Request
>>> {
>>>   "code" : 400,
>>>   "errors" : [ {
>>> "domain" : "global",
>>> "message" : "No object name",
>>> "reason" : "required"
>>>   } ],
>>>   "message" : "No object name"
>>> }
>>> at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(Us
>>> erCodeException.java:35)
>>> at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(
>>> UserCodeException.java:40)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCo
>>> deException(DoFnRunnerBase.java:369)
>>> at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokePr
>>> ocessElement(SimpleDoFnRunner.java:51)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processEle
>>> ment(DoFnRunnerBase.java:139)
>>> at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.p
>>> rocessElement(SimpleParDoFn.java:188)
>>> at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDo
>>> Fn.processElement(ForwardingParDoFn.java:42)
>>> at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerL
>>> oggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
>>> at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperat
>>> ion.process(ParDoOperation.java:55)
>>> at com.google.cloud.dataflow.sdk.util.common.worker.OutputRecei
>>> ver.process(OutputReceiver.java:52)
>>> at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1
>>> .output(SimpleParDoFn.java:158)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContex
>>> t.outputWindowedValue(DoFnRunnerBase.java:288)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProces
>>> sContext.output(DoFnRunnerBase.java:450)
>>> at com.google.cloud.dataflow.sdk.runners.worker.CombineValuesFn
>>> Factory$ExtractOutputDoFn.processElement(CombineValuesFnFact
>>> ory.java:270)
>>> at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokePr
>>> ocessElement(SimpleDoFnRunner.java:49)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processEle
>>> ment(DoFnRunnerBase.java:139)
>>> at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.p
>>> rocessElement(SimpleParDoFn.java:188)
>>> at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDo
>>> Fn.processElement(ForwardingParDoFn.java:42)
>>> at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerL
>>> oggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
>>> at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperat
>>> ion.process(ParDoOperation.java:55)
>>> at com.google.cloud.dataflow.sdk.util.common.worker.OutputRecei
>>> ver.process(OutputReceiver.java:52)
>>> at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1
>>> .output(SimpleParDoFn.java:158)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContex
>>> t.outputWindowedValue(DoFnRunnerBase.java:288)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContex
>>> t.outputWindowedValue(DoFnRunnerBase.java:284)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProces
>>> sContext$1.outputWindowedValue(DoFnRunnerBase.java:508)
>>> at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndComb
>>> ineDoFn.closeWindow(GroupAlsoByWindowsAndCombineDoFn.java:203)
>>> at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndComb
>>> ineDoFn.processElement(GroupAlsoByWindowsAndCombineDoF

Re: How to skip processing on failure at BigQueryIO sink?

2017-04-11 Thread Dan Halperin
I believe this is BEAM-190, which is actually being worked on today.
However, it will probably not be ready in time for the first stable release.

https://issues.apache.org/jira/browse/BEAM-190

On Tue, Apr 11, 2017 at 7:52 AM, Lukasz Cwik  wrote:

> Have you thought of fetching the schema upfront from BigQuery and
> prefiltering out any records in a preceeding DoFn instead of relying on
> BigQuery telling you that the schema doesn't match?
>
> Otherwise you are correct in believing that you will need to update
> BigQueryIO to have the retry/error semantics that you want.
>
> On Tue, Apr 11, 2017 at 1:12 AM, Josh  wrote:
>
>> What I really want to do is configure BigQueryIO to log an error and skip
>> the write if it receives a 4xx response from BigQuery (e.g. element does
>> not match table schema). And for other errors (e.g. 5xx) I want it to retry
>> n times with exponential backoff.
>>
>> Is there any way to do this at the moment? Will I need to make some
>> custom changes to BigQueryIO?
>>
>>
>>
>> On Mon, Apr 10, 2017 at 7:11 PM, Josh  wrote:
>>
>>> Hi,
>>>
>>> I'm using BigQueryIO to write the output of an unbounded streaming job
>>> to BigQuery.
>>>
>>> In the case that an element in the stream cannot be written to BigQuery,
>>> the BigQueryIO seems to have some default retry logic which retries the
>>> write a few times. However, if the write fails repeatedly, it seems to
>>> cause the whole pipeline to halt.
>>>
>>> How can I configure beam so that if writing an element fails a few
>>> times, it simply gives up on writing that element and moves on without
>>> affecting the pipeline?
>>>
>>> Thanks for any advice,
>>> Josh
>>>
>>
>>
>


Re: BigQueryIO - Why is CREATE_NEVER not supported when using a tablespec?

2017-04-07 Thread Dan Halperin
Hi Josh,
You raise a good point. I think we had put this check in (long before
partition tables existed) because we need schema to create a table and we
assumed the number of tables would be unbounded. But now it's an outdated
check, overly conservative, and probably should be removed.

Would you like to send a PR to fix this?

Thanks,
Dan


On Fri, Apr 7, 2017 at 10:03 AM, Josh  wrote:

> Hi all,
>
> I have a use case where I want to stream into BigQuery, using a tablespec
> but with CreateDisposition.CREATE_NEVER.I want to partition/shard my data
> by date, and use BigQuery's date partitioning feature within a single table
> (rather than creating a new BigQuery table for every day). In this case
> writes would be made to a partition in a single table, e.g.
> `my-project:dataset.my_table$20170407`, and in my tablespec I would just
> be choosing the partition decorator using the window.
>
> Unfortunately this doesn't seem possible with BigQueryIO at the moment,
> because it requires me to use CreateDisposition.CREATE_IF_NEEDED. I can't
> use CreateDisposition.CREATE_IF_NEEDED because it requires me to provide
> a table schema and my BigQuery schema isn't available at compile time.
>
> Is there any good reason why CREATE_NEVER is not allowed when using a
> tablespec?
>
> Thanks,
> Josh
>


Re: Reading/ writing xml file hangs indefinitely

2017-04-07 Thread Dan Halperin
Hi Richard,

I wonder if you're being hit by
https://issues.apache.org/jira/browse/BEAM-1309 -- namely, that the entire
/tmp directory might be being traversed.

As a sanity check, can you try moving your test file into a more specific
folder, like /tmp/beam/test_input/input.xml

If this resolves your issue, it's a good argument for prioritizing fixing
that issue ;)

Dan

On Fri, Apr 7, 2017 at 5:37 AM, Richard Hanson  wrote:

>
> On 06 April 2017 at 19:53 Dan Halperin  wrote:
>
> Hi Richard,
>
> Can you share a little more info about your environment? Here's a
> smattering of questions for which answers may be useful.
>
> * What runner are you using?
>
> I don't specify any runner so I believe it should be use direct runner.
>
> * What version of the SDK?
>
> Apache Beam version is 0.6.0 (beam-sdks-java-core
> and beam-runners-direct-java)
>
> * Does this reproduce in the DirectRunner?
>
> This problem I believe happens while running DirectRunner.
>
> * Can you share a full reproduction? (e.g., in a github gist)?
>
> JDK: 1.8.0_121
>
> Scala: 2.12.1
>
> sbt: 0.13.13
>
>
> Below is the sample xml file
>
> 
> 
> 
> 33
> John Smith
> 
> 
>
>
> The sample record object.
>
> @XmlRootElement
> class Customer {
>
> private var name: String = ""
>
> private var age: Int = 0
>
> private var id: Int = -1
>
> def getName():String = name
>
> @XmlElement
> def setName(name: String) = this.name = name
>
> def getAge(): Int = age
>
> @XmlElement
> def setAge(age: Int) = this.age = age
>
> def getId(): Int = id
>
> @XmlAttribute
> def setId(id: Int) = this.id = id
>
> }
>
> Pipeline procedure of the code.
>
>
> val options = PipelineOptionsFactory.create
> val p = Pipeline.create(options)
>
> val source = XmlSource.from[Customer](
> new File("customers.xml").toPath.toString
> ).withRootElement("customers").withRecordElement("customer").
> withRecordClass(classOf[Customer])
>
> val sink = XmlSink.write().toFilenamePrefix("xmlout").
> ofRecordClass(classOf[Customer]).
> withRootElement("customers")
>
> p.apply(Read.from(source)).apply(Write.to(sink))
>
> p.run.waitUntilFinish
>
>
> * What is happening on the machine(s) executing the job? Is there high
> CPU? Is the disk active? Etc.
>
> There is a high cpu usage which keeps at 99.x% when Java process is
> executing (when checking with top command).
>
> 7624 user  20   0 2837.6m 582.5m  23.5m S 99.3 11.4   2:42.11 java
>
> Monitoring with iotop shows disk io are (mostly) often performed by system
> processes e.g. kworker. Only seeing once or twice Java process (the only
> user process that runs on the machine) is doing disk io.
>
> Total DISK READ : 0.00 B/s | Total DISK WRITE : 0.00 B/s
> Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s
>
>
> TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
> 7720 be/4 root 0.00 B/s 0.00 B/s 0.00 % 0.01 % [kworker/0:2]
>
>
> Total DISK READ : 0.00 B/s | Total DISK WRITE : 15.62 K/s
> Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s
> TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
>
> 7626 be/4 user 0.00 B/s 11.72 K/s 0.00 % 0.00 % java -Xms~h.jar test
>
> 7633 be/4 user 0.00 B/s 3.91 K/s 0.00 % 0.00 % java -Xms~h.jar test
>
>
> Thanks,
> Dan
>
> On Tue, Apr 4, 2017 at 9:33 AM, Richard Hanson 
> wrote:
>
> I am testing apache beam to read/ write xml files. But I encounter a
> problem that even the code is just to read a single xml file and write it
> out without doing any transformation, the process seems to hang
> indefinitely. The output looks like below:
>
> [pool-2-thread-5-ScalaTest-running-XmlSpec] INFO org.apache.beam.sdk.io
> .FileBasedSource - Matched 1 files for pattern /tmp/input/my.xml
> [pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing write
> operation org.apache.beam.sdk.io.XmlSink$XmlWriteOperation@1c72df2c
>
>
> The code basically do the following:
>
> val options = PipelineOptionsFactory.create
> val p = Pipeline.create(options)
>
>
> val xml = XmlSource.from[Record](new File("/tmp/input/my.xml").toPa
> th.toString).withRootElement("RootE").withRecordElement("
> Record").withRecordClass(classOf[Record])
>
> p.apply(Read.from(xml)).apply(Write.to(XmlSink.write().toFil
> enamePrefix("xml").ofRecordClass(classOf[Record]).
> withRootElement("RootE")))
>
> p.run.waitUntilFinish
>
>
> What part may be missing in my program?
>
> Thanks
>
>
>


Re: Reading/ writing xml file hangs indefinitely

2017-04-06 Thread Dan Halperin
Hi Richard,

Can you share a little more info about your environment? Here's a
smattering of questions for which answers may be useful.

* What runner are you using?
* What version of the SDK?
* Does this reproduce in the DirectRunner?
* Can you share a full reproduction? (e.g., in a github gist)?
* What is happening on the machine(s) executing the job? Is there high CPU?
Is the disk active? Etc.

Thanks,
Dan

On Tue, Apr 4, 2017 at 9:33 AM, Richard Hanson  wrote:

> I am testing apache beam to read/ write xml files. But I encounter a
> problem that even the code is just to read a single xml file and write it
> out without doing any transformation, the process seems to hang
> indefinitely. The output looks like below:
>
> [pool-2-thread-5-ScalaTest-running-XmlSpec] INFO 
> org.apache.beam.sdk.io.FileBasedSource
> - Matched 1 files for pattern /tmp/input/my.xml
> [pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing write
> operation org.apache.beam.sdk.io.XmlSink$XmlWriteOperation@1c72df2c
>
>
> The code basically do the following:
>
> val options = PipelineOptionsFactory.create
> val p = Pipeline.create(options)
>
>
> val xml = XmlSource.from[Record](new File("/tmp/input/my.xml").
> toPath.toString).withRootElement("RootE").withRecordElement("Record").
> withRecordClass(classOf[Record])
>
> p.apply(Read.from(xml)).apply(Write.to(XmlSink.write().
> toFilenamePrefix("xml").ofRecordClass(classOf[Record])
> .withRootElement("RootE")))
>
> p.run.waitUntilFinish
>
>
> What part may be missing in my program?
>
> Thanks
>


Re: automatic runner inference

2017-04-03 Thread Dan Halperin
+Thomas

I thought, perhaps erroneously, that if there is only 1 runner on the
classpath we did this already. Though of course it wouldn't work in almost
any case -- most runners require extra command line flags.

On Mon, Apr 3, 2017 at 12:33 PM, Antony Mayi  wrote:

> Currently I need to compile my pipeline with options.setRunner(XY.class).
> is it not possible to infer the runner implicitly? ie. ultimately having
> one jar file that I can submit to any platform and it would run (currently
> I need to recompile or specify some switches before running it on different
> engine). Some kind of platform detection with potential fallback to direct
> runner.
>
> thanks,
> antony.
>


Re: Beam partitioned file reading and writing

2017-03-29 Thread Dan Halperin
Hi Billy,

Thanks for that feedback -- truly invaluable.

Would you mind filing a bug in the Flink runner component in JIRA with this
information? There may be a simple fix :)

On Wed, Mar 29, 2017 at 8:06 AM, Newport, Billy 
wrote:

> We tried that Robert on flink and it’s pretty awful performance wise. We
> doubled the speed by using a new hadoopoutputformat which basically has N
> hadoopoutputformats in side it and the writeRecord looks at the enum and
> writes the record to the appropriate “inner” output format.
>
>
>
> Most of the performance problems we’ve had so far are all forcing us to
> kind of hack the data flows in unnatural ways to avoid serialization costs.
>
>
>
> For example, read a file and split in to two datasets based on a column
> predicate. Approach 1:
>
>
>
> READ FILE -> Filter 1 -> Dataset
>
> è Filter 2 -> Dataset
>
>
>
> Is slower that:
>
>
>
> READ FILE -> Filter 1 -> Dataset
>
> READ FILE -> Filter 2 -> Dataset
>
>
>
> Read the file twice is faster! Go figure.
>
>
>
>
>
>
>
> *From:* Robert Bradshaw [mailto:rober...@google.com]
> *Sent:* Tuesday, March 28, 2017 4:25 AM
> *To:* user@beam.apache.org
> *Subject:* Re: Beam partitioned file reading and writing
>
>
>
> I would simply use a Partition transform to convert the
> PCollection> data to a list
> of PCollection and then write each one as desired. Either
> that or a DoFn with multiple side outputs (one for each enum value).
> There's no reason this should perform poorly on the runners I am aware of,
> and is the most straightforward (and least error-prone) solution. (I don't
> know what your original solution looked like, but if you used N filters
> rather than one DoFn with N outputs that would perform much worse.)
>
>
>
> https://beam.apache.org/documentation/programming-
> guide/#transforms-flatten-partition
> 
>
>
>
> On Tue, Mar 28, 2017 at 1:04 AM, Narek Amirbekian 
> wrote:
>
> We had to implement a custom sink in order to get partitioned destination.
> This is the least hacky solution, but the down-side of this approach is and
> it is a lot of code that custom sinks only work in batch mode. If you need
> to do this in streaming mode you may be able to get away with writing as a
> side-effect of a pardo, but you have to account for concurrent writes, and
> chunks might be processed more than once so you have to account for
> duplicate values in your output.
>
>
>
> On Thu, Mar 23, 2017 at 9:46 AM Newport, Billy 
> wrote:
>
> Is there builtin support for writing partitioned Collections. For example:
>
>
>
> PCollection> data;
>
>
>
> We want to write the GenericRecords in data in to different files based on
> the enum. We’ve did this in flink by making a proxy hadoopoutputformat
> which has the N real outputformats and the write method checks the enum and
> forwards the write call for the genericrecord to the correct outputformat.
>
>
>
> Given the lack of beam parquet support, the final writer we want to use
> with beam is Avro.
>
>
>
> We used the proxy outputformat trick in flink because performance was very
> poor using a filter to split it and then a map to convert from
> Enum,GenericRecord to just GenericRecord.
>
>
>
> I’m nervous to use side outputs in beam given I think they will be
> implemented as described here which performs poorly.
>
>
>
> So basically, has anyone implemented a demuxing AvroIO.Writer?
>
>
>
> Thanks
>
>
>


Re: Apache Beam cogroup help

2017-03-28 Thread Dan Halperin
Also as for expense -- it shouldn't be expensive, as on almost every runner
(flink included) such a ParDo will usually be run on the same machine
without any serialization

On Wed, Mar 22, 2017 at 12:25 PM, Aljoscha Krettek 
wrote:

> You can use WithKeys for that: https://beam.apache.org/
> documentation/sdks/javadoc/0.6.0/org/apache/beam/sdk/
> transforms/WithKeys.html
>
> Best,
> Aljoscha
>
> > On 22 Mar 2017, at 19:06, Newport, Billy  wrote:
> >
> > If I’m reading a parquet or avro file though, I don’t have a
> KV, I have a Data. Do I need to run a pardo just to extract the
> keys for this to work?
> >
> > PCollection data;
> > PCollection> keyedData = “data par do’ed to
> create KV for each GenericRecord, extracting possibly multiple field PKs
> encoded as a string”
> >
> > Then do the stuff below. This seems pretty expensive (serialization
> wise) compared with the flink Keyextractor for example or is it similar in
> practice?
> >
> > Thanks Thomas.
> >
> > From: Thomas Groh [mailto:tg...@google.com]
> > Sent: Wednesday, March 22, 2017 1:53 PM
> > To: user@beam.apache.org
> > Subject: Re: Apache Beam cogroup help
> >
> > This would be implemented via a CoGroupByKey (https://beam.apache.org/
> documentation/sdks/javadoc/0.6.0/org/apache/beam/sdk/
> transforms/join/CoGroupByKey.html)
> >
> > Your transform logic will be mostly the same; after applying the
> extraction (the right side of k1 and k2 in your example), you should have
> two PCollections of KVs -
> >
> > PCollection> k1;
> > PCollection> k2;
> >
> > You can construct a KeyedPCollectionTuple containing the two
> PCollections:
> >
> > final TupleTag data1Tag = new TupleTag<>();
> > final TupleTag data2Tag = new TupleTag<>();
> > KeyedPCollectionTuple coGroupTuple = KeyedPCollectionTuple.of(data1Tag,
> k1).and(data2Tag, k2);
> >
> > Then apply the CoGroupByKey:
> >
> > PColection> coGrouped = coGroupTuple.apply(
> CoGroupByKey.create());
> >
> > Then you can run an arbitrary ParDo to combine the elements as
> appropriate. You'll need to reuse the TupleTags created above to extract
> out the per-PCollection outputs. As a simple example where the elements
> have a shared supertype CombinedData, and you'd like to add them to a
> single output list:
> >
> > PCollection> combined =
> coGrouped.apply(ParDo.of(new DoFn, KV List>>() {
> >   @ProcessElement
> >   public void process(ProcessContext context) {
> > List all = new ArrayList<>();
> > for (Data1 d1 : context.element().value().getAll(data1Tag)) {
> >   all.add(d1);
> > }
> > for (Data2 d2 : context.element().value().getAll(data2Tag)) {
> >   all.add(d2);
> > }
> > context.output(all);
> >   }
> > }));
> >
> > On Wed, Mar 22, 2017 at 10:35 AM, Newport, Billy 
> wrote:
> > Trying to port flink code to Apache Beam but I’m having trouble decoding
> the documentation.
> >
> > I have flink code which looks like:
> >
> > DataSet d1 = Read parquet
> > DataSet d2 = Read Avro
> > KeyExtractor k1 = … (extracts an object containing the
> key fields from d1 records)
> > KeyExtractor k2 = … (extracts an object containing the
> key fields from d2 records)
> >
> > CoGroup grouper = (combines
> values for equal keys in to a combined list for that key)
> >
> > DataSet combined = d1.coGroup(d2).where(k1).
> equalTo(k2).with(grouper)
> >
> > Whats the beam equivalent?
> >
> > Thanks
>
>


Re: Regarding Beam Slack Channel

2017-03-27 Thread Dan Halperin
There are 106 members in the channel right now. I think the problem is the
number of outstanding invitations. (Folks, if you've got an invite --
please accept!)

On Mon, Mar 27, 2017 at 4:39 AM, Jean-Baptiste Onofré 
wrote:

> Hi,
>
> As said during the week end, we reached the max number of members on the
> Slack channel (90). We are checking different ways to increase that.
> I will add you as soon as it's OK.
> Sorry for the delay,
>
> Regards
> JB
>
>
> On 03/27/2017 11:09 AM, Chenchong Qin wrote:
>
>> Hello
>>
>> Can someone please add me to the Beam slack channel?
>>
>> Thanks.
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Kafka Offset handling for Restart/failure scenarios.

2017-03-21 Thread Dan Halperin
[We should keep user list involved if that's where the discussion
originally was :)]

Jins George's original question was a good one. The right way to resume
from the previous offset here is what we're already doing – use the
KafkaCheckpointMark. In Beam, the runner maintains the state and not the
external system. Beam runners are responsible for maintaining the
checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If
a user disables checkpointing, then they are explicitly opting into "redo
all work" on restart.

--> If checkpointing is enabled but the KafkaCheckpointMark is not being
provided, then I'm inclined to agree with Amit that there may simply be a
bug in the FlinkRunner. (+aljoscha)

For what Mingmin Xu asked about: presumably if the Kafka source is
initially configured to "read from latest offset", when it restarts with no
checkpoint this will automatically go find the latest offset. That would
mimic at-most-once semantics in a buggy runner that did not provide
checkpointing.

Dan

On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu  wrote:

> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory.
> Can it restore during job restart? --Not test the runner in streaming for
> some time.
>
> Regarding to data-completeness, I would use at-most-once when few data
> missing(mostly tasknode failure) is tolerated, compared to the performance
> cost introduced by 'state'/'checkpoint'.
>
> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela  wrote:
>
> > On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu  wrote:
> >
> > > Move discuss to dev-list
> > >
> > > Savepoint in Flink, also checkpoint in Spark, should be good enough to
> > > handle this case.
> > >
> > > When people don't enable these features, for example only need
> > at-most-once
> > >
> > The Spark runner forces checkpointing on any streaming (Beam)
> application,
> > mostly because it uses mapWithState for reading from UnboundedSource and
> > updateStateByKey form GroupByKey - so by design, Spark runner is
> > at-least-once. Generally, I always thought that applications that require
> > at-most-once are more focused on processing time only, as they only care
> > about whatever get's ingested into the pipeline at a specific time and
> > don't care (up to the point of losing data) about correctness.
> > I would be happy to hear more about your use case.
> >
> > > semantic, each unbounded IO should try its best to restore from last
> > > offset, although CheckpointMark is null. Any ideas?
> > >
> > > Mingmin
> > >
> > > On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin 
> > wrote:
> > >
> > > > hey,
> > > >
> > > > The native Beam UnboundedSource API supports resuming from checkpoint
> > --
> > > > that specifically happens here
> > > > <
> > > https://github.com/apache/beam/blob/master/sdks/java/io/kafk
> > a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
> > > when
> > > > the KafkaCheckpointMark is non-null.
> > > >
> > > > The FlinkRunner should be providing the KafkaCheckpointMark from the
> > most
> > > > recent savepoint upon restore.
> > > >
> > > > There shouldn't be any "special" Flink runner support needed, nor is
> > the
> > > > State API involved.
> > > >
> > > > Dan
> > > >
> > > > On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
> j...@nanthrax.net
> > >
> > > > wrote:
> > > >
> > > >> Would not it be Flink runner specific ?
> > > >>
> > > >> Maybe the State API could do the same in a runner agnostic way (just
> > > >> thinking loud) ?
> > > >>
> > > >> Regards
> > > >> JB
> > > >>
> > > >> On 03/21/2017 04:56 PM, Mingmin Xu wrote:
> > > >>
> > > >>> From KafkaIO itself, looks like it either start_from_beginning or
> > > >>> start_from_latest. It's designed to leverage
> > > >>> `UnboundedSource.CheckpointMark`
> > > >>> during initialization, but so far I don't see it's provided by
> > runners.
> > > >>> At the
> > > >>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775
> > > >>> <https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it
> in
> > > >>> Kafka

Re: Kafka Offset handling for Restart/failure scenarios.

2017-03-21 Thread Dan Halperin
hey,

The native Beam UnboundedSource API supports resuming from checkpoint --
that specifically happens here

when
the KafkaCheckpointMark is non-null.

The FlinkRunner should be providing the KafkaCheckpointMark from the most
recent savepoint upon restore.

There shouldn't be any "special" Flink runner support needed, nor is the
State API involved.

Dan

On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré 
wrote:

> Would not it be Flink runner specific ?
>
> Maybe the State API could do the same in a runner agnostic way (just
> thinking loud) ?
>
> Regards
> JB
>
> On 03/21/2017 04:56 PM, Mingmin Xu wrote:
>
>> From KafkaIO itself, looks like it either start_from_beginning or
>> start_from_latest. It's designed to leverage
>> `UnboundedSource.CheckpointMark`
>> during initialization, but so far I don't see it's provided by runners.
>> At the
>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775
>> )  to handle it in
>> KafkaIO.
>>
>> Mingmin
>>
>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek > > wrote:
>>
>> Hi,
>> Are you using Flink savepoints [1] when restoring your application?
>> If you
>> use this the Kafka offset should be stored in state and it should
>> restart
>> from the correct position.
>>
>> Best,
>> Aljoscha
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> setup/savepoints.html
>> > 3/setup/savepoints.html>
>> > On 21 Mar 2017, at 01:50, Jins George > > wrote:
>> >
>> > Hello,
>> >
>> > I am writing a Beam pipeline(streaming) with Flink runner to
>> consume data
>> from Kafka and apply some transformations and persist to Hbase.
>> >
>> > If I restart the application ( due to failure/manual restart),
>> consumer
>> does not resume from the offset where it was prior to restart. It
>> always
>> resume from the latest offset.
>> >
>> > If I enable Flink checkpionting with hdfs state back-end, system
>> appears
>> to be resuming from the earliest offset
>> >
>> > Is there a recommended way to resume from the offset where it was
>> stopped ?
>> >
>> > Thanks,
>> > Jins George
>>
>>
>>
>>
>> --
>> 
>> Mingmin
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: collect to local

2017-02-24 Thread Dan Halperin
It may also be worth looking at how Scio implements this. The Scio version
based on Beam is here: https://github.com/spotify/scio/tree/apache-beam

And they have given some good talks.
https://www.slideshare.net/sinisalyh/scio-a-scala-api-for-google-cloud-dataflow-apache-beam
I believe the "closeAndCollect" operator in Scio is the one like Amit is
discussion.

On Mon, Feb 20, 2017 at 2:32 AM, Amit Sela  wrote:

> You could consider using Aggregators or Metrics (Metrics are still
> experimental and currently only supported by the Direct and Spark runner).
>
> Simply add a DoFn that reports to the Aggregator - see here
> 
>  how
> to use Aggregators in DoFn.
> Then query
> 
> the result in the PipelineResult.
>
> Would this work for your use case ?
>
>
> On Mon, Feb 20, 2017 at 12:17 PM Antony Mayi  wrote:
>
>>
>> Thanks Amit,
>>
>> I fully understand the controversy of trying to collect Big data into
>> local memory... But lets say the data is result of some reduce operation so
>> driver OOM is not a problem and further processing needs to continue in the
>> driver and getting it there via Kafka is an overkill (ie the system would
>> otherwise not use Kafka at all so this would mean new dependency). I get
>> the point that I could implement all the rest on PCollection but once
>> (significant) part of the pipeline doesn't need big-data/map-reduce
>> tool-set, it would just be way easier implementing it locally.
>>
>> Antony.
>> On Monday, 20 February 2017, 10:53, Amit Sela 
>> wrote:
>>
>>
>> Hi Antony,
>>
>> Generally, PCollections are a distributed bag of elements, just like
>> Spark RDDs (for batch).
>> Assuming you have a distributed collection, you probably wouldn't want to
>> materialize it locally, and even if it's a global count (result) of some
>> kind (guaranteeing to avoid OOM in your "driver") you'd probably want to
>> write it to a Sink of some kind - Kafka, HDFS, etc.
>>
>> I'm curious how would you use "collect()" or materializing the
>> PCollection in the driver program ? what did you have in mind ?
>>
>> You can implement a custom Sink - Spark runner has it's own ConsoleIO to
>> print to screen using Spark's print() but I use it for dev iterations and
>> it clearly works only for the Spark runner.
>>
>> Amit
>>
>>
>> On Mon, Feb 20, 2017 at 11:40 AM Jean-Baptiste Onofré 
>> wrote:
>>
>> Hi Antony,
>>
>> The Spark runner deals with caching/persist for you (analyzing how many
>> time the same PCollection is used).
>>
>> For the collect(), I don't fully understand your question.
>>
>> If if you want to process elements in the PCollection, you can do simple
>> ParDo:
>>
>> .apply(ParDo.of(new DoFn() {
>>@ProcessElement
>>public void processElements(ProcessContext context) {
>>  T element = context.element();
>>  // do whatever you want
>>}
>> })
>>
>> Is it not what you want ?
>>
>> Regards
>> JB
>>
>> On 02/20/2017 10:30 AM, Antony Mayi wrote:
>> > Hi,
>> >
>> > what is the best way to fetch content of PCollection to local
>> > memory/process (something like calling .collect() on Spark rdd)? Do I
>> > need to implement custom Sink?
>> >
>> > Thanks for any clues,
>> > Antony.
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>>
>>
>>


Re: Testing/Running a pipeline with a BigQuery Sink locally with the DirectRunner

2017-02-17 Thread Dan Halperin
This  looks like the same issue that Scio encountered with the Google API
Client libraries: https://github.com/spotify/scio/issues/388

I think that if the `value` is null, you are supposed by BigQuery to omit
the key rather than include it with a null value.

On Fri, Feb 17, 2017 at 11:38 PM, Dan Halperin  wrote:

> It looks to me like the NPE comes from the Google API client library. It
> looks like maybe you are creating an invalid tablerow (null key? null
> value?)
>
> at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
> java:419)
>
> Dan
>
> On Fri, Feb 17, 2017 at 3:19 PM, Kenneth Knowles  wrote:
>
>> Hi Tobias,
>>
>> The specific error there looks like you have a forbidden null somewhere
>> deep inside the output of logLine.toTableRow(). Hard to say more with this
>> information.
>>
>> Kenn
>>
>> On Fri, Feb 17, 2017 at 4:46 AM, Tobias Feldhaus <
>> tobias.feldh...@localsearch.ch> wrote:
>>
>>> It seems like this is caused by the fact that the workaround I am using
>>> to write
>>> daily-partitioned tables in batch mode does not work.
>>>
>>> My problem is that with more than 1000 days, the date-sharded table in
>>> BQ will
>>> be too large to be converted automatically via a simple “bq partition”
>>> command
>>> into a partitioned table as such table cannot have more than 1000 days.
>>>
>>> So the solution will be a divide-and-conquer strategy I guess.
>>>
>>> On 17.02.17, 11:36, "Tobias Feldhaus" 
>>> wrote:
>>>
>>> Hello,
>>>
>>> could it be, that it's no longer possible to run pipelines with a
>>> BigQuery sink
>>> locally on the dev machine? I migrated a "Read JSON from GCS, parse
>>> and
>>> write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK.
>>> All tests are green, the pipeline runs successfully on the Dataflow
>>> service with
>>> the test files, but locally with the DirectRunner I get a NPE.
>>>
>>> It happens right after I create the TableRow element which I even
>>> double
>>> checked not to be null. Even when I artificially create a LogLine
>>> element in this step without taking the one from the input the NPE
>>> is thrown:
>>>
>>>
>>> static class Outputter extends DoFn {
>>> (...)
>>> LogLine logLine = c.element();
>>>
>>> TableRow tableRow = logLine.toTableRow();
>>> tableRow.set("ts", c.timestamp().toString());
>>>
>>> if (c != null && tableRow != null){
>>> try {
>>>
>>> c.output(tableRow);
>>> }
>>> catch(NullPointerException e){
>>> LOG.error("catched NPE");
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> The corrensponding Stacktrace looks like this:
>>>
>>> ERROR: catched NPE
>>> java.lang.NullPointerException
>>> at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
>>> java:419)
>>> at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>>> at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
>>> java:419)
>>> at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>>> at java.util.Arrays.hashCode(Arrays.java:4146)
>>> at java.util.Objects.hash(Objects.java:128)
>>> at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlo
>>> balWindow.hashCode(WindowedValue.java:409)
>>> at java.util.HashMap.hash(HashMap.java:338)
>>> at java.util.HashMap.get(HashMap.java:556)
>>> at org.apache.beam.runners.direct.repackaged.com.google.common.
>>> collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
>>> at org.apache.beam.runners.direct.repackaged.com.google.common.
>>> collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
>>> at org.apache.beam.runners.direct.repackaged.com.google.common.
>>> collect.HashMultimap.put(HashMultimap.java:49)
>>> at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFac
>>> tory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBun
>>> dleFactory.java:112)
>>> at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputMa
>>

Re: Testing/Running a pipeline with a BigQuery Sink locally with the DirectRunner

2017-02-17 Thread Dan Halperin
It looks to me like the NPE comes from the Google API client library. It
looks like maybe you are creating an invalid tablerow (null key? null
value?)

at com.google.api.client.util.ArrayMap$Entry.hashCode(
ArrayMap.java:419)

Dan

On Fri, Feb 17, 2017 at 3:19 PM, Kenneth Knowles  wrote:

> Hi Tobias,
>
> The specific error there looks like you have a forbidden null somewhere
> deep inside the output of logLine.toTableRow(). Hard to say more with this
> information.
>
> Kenn
>
> On Fri, Feb 17, 2017 at 4:46 AM, Tobias Feldhaus <
> tobias.feldh...@localsearch.ch> wrote:
>
>> It seems like this is caused by the fact that the workaround I am using
>> to write
>> daily-partitioned tables in batch mode does not work.
>>
>> My problem is that with more than 1000 days, the date-sharded table in BQ
>> will
>> be too large to be converted automatically via a simple “bq partition”
>> command
>> into a partitioned table as such table cannot have more than 1000 days.
>>
>> So the solution will be a divide-and-conquer strategy I guess.
>>
>> On 17.02.17, 11:36, "Tobias Feldhaus" 
>> wrote:
>>
>> Hello,
>>
>> could it be, that it's no longer possible to run pipelines with a
>> BigQuery sink
>> locally on the dev machine? I migrated a "Read JSON from GCS, parse
>> and
>> write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK.
>> All tests are green, the pipeline runs successfully on the Dataflow
>> service with
>> the test files, but locally with the DirectRunner I get a NPE.
>>
>> It happens right after I create the TableRow element which I even
>> double
>> checked not to be null. Even when I artificially create a LogLine
>> element in this step without taking the one from the input the NPE is
>> thrown:
>>
>>
>> static class Outputter extends DoFn {
>> (...)
>> LogLine logLine = c.element();
>>
>> TableRow tableRow = logLine.toTableRow();
>> tableRow.set("ts", c.timestamp().toString());
>>
>> if (c != null && tableRow != null){
>> try {
>>
>> c.output(tableRow);
>> }
>> catch(NullPointerException e){
>> LOG.error("catched NPE");
>> e.printStackTrace();
>> }
>> }
>>
>> The corrensponding Stacktrace looks like this:
>>
>> ERROR: catched NPE
>> java.lang.NullPointerException
>> at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
>> java:419)
>> at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>> at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
>> java:419)
>> at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>> at java.util.Arrays.hashCode(Arrays.java:4146)
>> at java.util.Objects.hash(Objects.java:128)
>> at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlo
>> balWindow.hashCode(WindowedValue.java:409)
>> at java.util.HashMap.hash(HashMap.java:338)
>> at java.util.HashMap.get(HashMap.java:556)
>> at org.apache.beam.runners.direct.repackaged.com.google.common.
>> collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
>> at org.apache.beam.runners.direct.repackaged.com.google.common.
>> collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
>> at org.apache.beam.runners.direct.repackaged.com.google.common.
>> collect.HashMultimap.put(HashMultimap.java:49)
>> at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFac
>> tory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBun
>> dleFactory.java:112)
>> at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputMa
>> nager.output(ParDoEvaluator.java:198)
>> at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.ou
>> tputWindowedValue(SimpleDoFnRunner.java:352)
>> at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessCon
>> text.output(SimpleDoFnRunner.java:553)
>> at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter
>> .processElement(FrontendPipeline.java:181)
>> at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter
>> $auxiliary$sxgOpc6N.invokeProcessElement(Unknown Source)
>> at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessE
>> lement(SimpleDoFnRunner.java:199)
>> at org.apache.beam.runners.core.SimpleDoFnRunner.processElement
>> (SimpleDoFnRunner.java:161)
>> at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
>> cessElement(PushbackSideInputDoFnRunner.java:111)
>> at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
>> cessElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
>> at org.apache.beam.runners.direct.ParDoEvaluator.processElement
>> (ParDoEvaluator.java:134)
>> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingT
>> ransformEvaluator.processElement(DoFnLifecycleManagerRemovin
>> gTransformEval

Re: Implicit file-size limit of input files?

2017-02-17 Thread Dan Halperin
If you have shuffles between the "Partition" operator and the "Write"
operator, yes. Note that most runners will run a partition and all its
outputs in the same thread at the same time. The way you separate these is,
in many runners, by inserting a shuffle.

Alternately, you can look at the GcsOptions
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java#L75>
to find a way to reduce the memory consumption of the network to GCS, if
you want to reduce the total memory taken by each of those 1000 connections
in the same thread.

On Fri, Feb 17, 2017 at 5:08 AM, Tobias Feldhaus <
tobias.feldh...@localsearch.ch> wrote:

> Hi Dan,
>
> just one follow up question, as I have completely revised my pipeline now
> and
> want to write AVRO files to GCS first (one per day). You said that
>
> by default writing to GCS uses a 64 MiB buffer so if you have 10
> partitions
> you're allocating 640 MiB, per core, just for those network
> buffers.
>
> Can I somehow optimize this? Would it be possible to partition a
> PCollection
> into 1000 partitions when using “enough” workers with “enough” memory?
>
> Tobias
>
>
> On 13.02.17, 09:42, "Tobias Feldhaus" 
> wrote:
>
> Hi Dan,
>
> Thank you for your response!
>
> The approach I am using to write per window tables seems to work in
> batch and
> streaming mode, at least this is claimed here [0], and I have
> confirmed this
> with the author of this post. I also tested this with smaller files in
> my own
> setup.
>
> Would a shuffling operation on a non-key-value
> input look like this [1], or is there already some PTransform in the
> SDK that I
>     am not aware of?
>
> Tobias
>
> [0] http://stackoverflow.com/a/40863609/5497956
> [1] http://stackoverflow.com/a/40769445/5497956
>
> From: Dan Halperin 
> Reply-To: "user@beam.apache.org" 
> Date: Saturday, 11 February 2017 at 21:31
> To: "user@beam.apache.org" 
> Subject: Re: Implicit file-size limit of input files?
>
> Hi Tobias,
>
> There should be no specific limitations in Beam on file size or
> otherwise, obviously different runners and different size clusters will
> have different potential scalability.
>
> A few general Beam tips:
>
> * Reading from compressed files is often a bottleneck, as this work is
> not parallelizable. If you find reading from compressed files is a
> bottleneck, you may want to follow it with a shuffling operation to improve
> parallelism as most runners can run the work pre- and post-shuffle on
> different machines (with different scaling levels).
>
> * The Partition operator on its own does not improve parallelism.
> Depending on how the runner arranges the graph, when you partition N ways
> you may still execute all N partitions on the same machine. Again, a
> shuffling operator here will often let runners to execute the N branches
> separately.
>
>(There are known issues for certain sinks when N is high. For
> example, by default writing to GCS uses a 64 MiB buffer so if you have 10
> partitions you're allocating 640 MiB, per core, just for those network
> buffers.)
>
> It sounds like you may be trying to use the "to(Partition function)"
> method of writing per window tables. The javadoc for BigQueryIO.Write
> clearly documents (https://github.com/apache/
> beam/blob/master/sdks/java/io/google-cloud-platform/src/
> main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L232) that
> it is not likely to work in "batch" runners.
>
> I suggest reaching out to Google Cloud via the recommendations at
> https://cloud.google.com/dataflow/support if you have issues specific to
> the Google Cloud Dataflow runner.
>
> Dan
>
> On Fri, Feb 10, 2017 at 3:18 AM, Tobias Feldhaus <
> tobias.feldh...@localsearch.ch> wrote:
> Addendum: When running in streaming mode with version 0.5 of the SDK,
> the elements are basically stuck before getting emitted [0], but the
> whole
> process starts and is running up to a point when most likely the
> memory is
> full (GC overhead error) and it crashes [0].
>
> It seems like the Reshuffle that is taking place prevents any output
> to happen.
> To get rid of that, I would need to find another way to write to a
> partition in
> BigQuery in batch mode without using the workaround that is described
> here [1],
> but I don't know how.
>
> [0] https://puu.sh/tWInq/f41beae65b.png
> [1] http://stackoverflow.com/questions/38114306/creati

Re: Implicit file-size limit of input files?

2017-02-11 Thread Dan Halperin
Hi Tobias,

There should be no specific limitations in Beam on file size or otherwise,
obviously different runners and different size clusters will have different
potential scalability.

A few general Beam tips:

* Reading from compressed files is often a bottleneck, as this work is not
parallelizable. If you find reading from compressed files is a bottleneck,
you may want to follow it with a shuffling operation to improve parallelism
as most runners can run the work pre- and post-shuffle on different
machines (with different scaling levels).

* The Partition operator on its own does not improve parallelism. Depending
on how the runner arranges the graph, when you partition N ways you may
still execute all N partitions on the same machine. Again, a shuffling
operator here will often let runners to execute the N branches separately.

   (There are known issues for certain sinks when N is high. For example,
by default writing to GCS uses a 64 MiB buffer so if you have 10 partitions
you're allocating 640 MiB, per core, just for those network buffers.)

It sounds like you may be trying to use the "to(Partition function)" method
of writing per window tables. The javadoc for BigQueryIO.Write clearly
documents (
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L232)
that it is not likely to work in "batch" runners.

I suggest reaching out to Google Cloud via the recommendations at
https://cloud.google.com/dataflow/support if you have issues specific to
the Google Cloud Dataflow runner.

Dan

On Fri, Feb 10, 2017 at 3:18 AM, Tobias Feldhaus <
tobias.feldh...@localsearch.ch> wrote:

> Addendum: When running in streaming mode with version 0.5 of the SDK,
> the elements are basically stuck before getting emitted [0], but the whole
> process starts and is running up to a point when most likely the memory is
> full (GC overhead error) and it crashes [0].
>
> It seems like the Reshuffle that is taking place prevents any output to
> happen.
> To get rid of that, I would need to find another way to write to a
> partition in
> BigQuery in batch mode without using the workaround that is described here
> [1],
> but I don't know how.
>
> [0] https://puu.sh/tWInq/f41beae65b.png
> [1] http://stackoverflow.com/questions/38114306/creating-
> writing-to-parititoned-bigquery-table-via-google-cloud-dataflow/40863609#
> 40863609
>
> On 10.02.17, 10:34, "Tobias Feldhaus" 
> wrote:
>
> Hi,
>
> I am currently facing a problem with a relatively simple pipeline [0]
> that is
> reading gzipped JSON files on Google Cloud Storage (GCS), adding a
> timestamp,
> and pushing it into BigQuery. The only special thing I am doing as
> well is
> partitioning it via a PartioningWindowFn that is assigning a partition
> for each element as described here [1].
>
> The pipeline works locally and remotely on the Google Cloud Dataflow
> Service
> (GCDS) with smaller test files, but if I run it on the about 100 real
> ones with
> 2GB each it breaks down in streaming and batch mode with different
> errors.
>
> The pipeline runs in batch mode, but in the end it gets stuck with
> processing only
> 1000-5000 streaming inserts per second to BQ, while constantly scaling
> up the
> number of instances [2]. As you can see in the screenshot the shuffle
> never
> started, before I had to stop it to cut the costs.
>
> If run in streaming mode, the pipeline creation fails because of a
> resource
> allocation failure (Step setup_resource_disks_harness19: Set up of
> resource
> disks_harness failed: Unable to create data disk(s): One or more
> operations
> had an error: [QUOTA_EXCEEDED] 'Quota 'DISKS_TOTAL_GB' exceeded.
> Limit: 8.0) This means, it has requested more than 80 (!) TB for
> the job that
> operates on 200 GB compressed (or 2 TB uncompressed) files.
>
> I’ve tried to run it with instances that are as large as n1-highmem-16
> (104 GB memory each) and 1200 GB local storage.
>
> I know this is a mailing list of Apache Beam and not intended for GCDF
> support,
> my question is therefore if anyone has faced the issue with the SDK
> before, or
> if there is a known size limit for files.
>
>
> Thanks,
> Tobias
>
> [0] https://gist.github.com/james-woods/98901f7ef2b405a7e58760057c4816
> 2f
> [1] http://stackoverflow.com/a/40863609/5497956
> [2] https://puu.sh/tWzkh/49b99477e3.png
>
>
>
>


Re: BigQuery join in Apache beam

2017-02-05 Thread Dan Halperin
Definitely, using BigQuery for what BigQuery is really good at (big scans
and cost-based joins) is nearly always a good idea. A strong endorsement of
Ankur's answer.

Pushing the right amount of work into a database is an art, however --
there are some scenarios where you'd rather scan in BQ and join in Beam
because the join result is very large and you can better filter it in Beam,
or because you need to do some pre-join-filtering based on an external API
call (and you don't want to load the results of that API call into
BigQuery)...

I've only seen a few, rare, cases of the latter.

Thanks,
Dan

On Sun, Feb 5, 2017 at 9:19 PM, Prabeesh K.  wrote:

> Hi Ankur,
>
> Thank you for your response.
>
> On 5 February 2017 at 23:59, Ankur Chauhan  wrote:
>
>> I have found doing joins in bigquery using sql is a lot faster and easier
>> to iterate upon.
>>
>>
>> Ankur Chauhan
>> On Sat, Feb 4, 2017 at 22:05 Prabeesh K.  wrote:
>>
>>> Hi,
>>>
>>> Which is the better way to join two tables in apache beam?
>>>
>>> Regards,
>>> Prabeesh K.
>>>
>>
>


Re: Regarding Beam Slack Channel

2017-01-23 Thread Dan Halperin
+1 to please make it an open slack channel if possible. And if not, please
enlarge the set of approvers as quickly as possible.

On Sun, Jan 22, 2017 at 11:10 PM, Ritesh Kasat 
wrote:

> Thanks Jean
>
> —Ritesh
>
> On 22-Jan-2017, at 10:36 PM, Jean-Baptiste Onofré  wrote:
>
> I will.
>
> By the way, not sure it's possible (I gonna check) but maybe it would be
> great to have a open slack channel (like IRC) without invite needed.
>
> Regards
> JB
> On Jan 23, 2017, at 07:32, Ritesh Kasat  wrote:
>>
>> Hello,
>>
>> Can someone add me to the Beam slack channel.
>>
>> Thanks
>> Ritesh
>>
>>
>


Re: Extending Beam to Write/Read to Apache Accumulo.

2017-01-12 Thread Dan Halperin
Awesome, thanks Wyatt!

On Thu, Jan 12, 2017 at 10:08 AM, Wyatt Frelot  wrote:

> Thanks for the feedback and "points in the right direction". I will create
> a JIRA ticket and coordinate status from that point. Additionally, if I
> have anymore questions...will submit to the mailing list.
>
> Again, thanks all! I definitely feel welcome!
>
> Wyatt
>
> On Thu, Jan 12, 2017 at 12:45 PM Stephen Sisk  wrote:
>
>> Hi Wyatt!
>>
>> some other info you might find useful:
>> * You might be tempted to implement a Sink - it's the obvious thing in
>> the API for writing to external data stores. However, we're finding it less
>> useful these days and generally discouraging its use unless you're writing
>> to files (which you're not). Instead, if you can, just implement a DoFn
>> that does the write. As Davor mentioned, BigTableIO is a good example of
>> this.
>> * It's useful to understand the lifecycle of DoFns 
>> (setup/startbundle/finishbundle/teardown.)
>> For example, you'll likely want to batch writes for efficiency - BigTableIO
>> does this by flushing writes stored locally in finishBundle.
>> * BigTableIO uses a separate "service" class - that's useful for making
>> your tests simpler by abstracting out the network retry/etc logic
>>
>> As you'll have noticed by the multiple replies to your message, people
>> are eager to answer questions you might have - feel free to pipe up on the
>> mailing list (dev@ might be more appropriate in that case.)
>>
>> S
>>
>> On Wed, Jan 11, 2017 at 9:14 PM Jean-Baptiste Onofré 
>> wrote:
>>
>> Welcome and fully agree with Davor.
>>
>> You can count on me to do the review !
>>
>> Regards
>> JB
>> On Jan 12, 2017, at 06:12, Davor Bonaci  wrote:
>>
>> Hi  Wyatt -- welcome!
>>
>> If you'd like to write to a PCollection to Apache  Accumulo's key/value
>> store, writing an new IO connector would be the best path forward. Accumulo
>> has somewhat similar concepts as BigTable, so you can use the existing
>> BigTableIO as an inspiration.
>>
>> You are thinking it exactly right -- a connector written in Beam would be
>> runner-independent, and thus can run anywhere.
>>
>> I'm not aware that anybody has started on this one yet -- feel free to
>> file a JIRA to have a place to coordinate if someone else is interested.
>> And, if you get stuck or need help in any way, there are plenty of people
>> on the Beam mailing lists happy to help!
>>
>> Once again, welcome!
>>
>> Davor
>>
>> On Wed, Jan 11, 2017 at 6:04 PM, Wyatt Frelot  wrote:
>>
>> All,
>>
>> Being new to Apache Beam...I want to ensure that I approach things the
>> "right way".
>>
>> My goal:
>>
>> I want to be able to write a PCollection to Apache Accumulo. Something
>> like this:
>>
>>   PCollection.apply( AccumuloIO.Write.to("AccumuloTable"));
>>
>>
>> While I am sure I can create a custom class to do so, it has me thinking
>> about identifying the best way forward.
>>
>> I want to use the Apex Runner to run my applications. Apex has Malhar
>> libraries that are already written that would be really useful. But, I
>> don't think that is the point. The goal is to develop IO Connectors that
>> are able to be applied to any runner.  Am I thinking about his correctly?
>>
>> Is there any work being done to develop an IO Connector for Apache
>> Accumulo?
>>
>> Wyatt
>>
>>
>> wa
>>
>>
>>