Use shared invites for Beam Slack Channel

2018-03-13 Thread Dan Halperin
Slack now has a publicly-endorsed way to post a public shared invite:
https://my.slack.com/admin/shared_invites

The downside is that it apparently has to be renewed monthly.

Maybe Beam should use that, instead? Tradeoffs are not obvious, but it
seems a win:

* forget to renew -> people can't sign up (and reach out to user@?)
vs
* now, always force them to mail user@

Dan

On Tue, Mar 13, 2018 at 11:54 AM, Lukasz Cwik  wrote:

> Invite sent, welcome.
>
> On Tue, Mar 13, 2018 at 11:08 AM, Ramjee Ganti 
> wrote:
>
>> Hello,
>>
>> I am using Apache Beam and Dataflow for the last few months and Can
>> someone please add me to the Beam slack channel?
>>
>> Thanks
>> Ramjee
>> http://ramjeeganti.com
>>
>
>


Re: "Radically modular data ingestion APIs in Apache Beam" @ Strata - slides available

2018-03-08 Thread Dan Halperin
Looks like it was a good talk! Why is it Google Confidential & Proprietary,
though?

Dan

On Thu, Mar 8, 2018 at 11:49 AM, Eugene Kirpichov 
wrote:

> Hey all,
>
> The slides for my yesterday's talk at Strata San Jose https://conferences.
> oreilly.com/strata/strata-ca/public/schedule/detail/63696 have been
> posted on the talk page. They may be of interest both to users and IO
> authors.
>
> Thanks.
>


Re: BigQuery join in Apache beam

2017-05-03 Thread Dan Halperin
Hi Prabeesh,

The underlying Beam primitive you use for Join is CoGroupByKey – this takes
N different collections KV<K, V1> , KV<K, V2> , ... K<K, VN> and produces
one collection KV<K, [Iterable, Iterable, ..., Iterable]>. This
is a compressed representation of a Join result, in that you can expand it
to a full outer join, you can implement inner join, and you can implement
lots of other join algorithms.

There is also a Join library that does this under the hood:
https://github.com/apache/beam/tree/master/sdks/java/extensions/join-library


Dan

On Wed, May 3, 2017 at 6:30 AM, Prabeesh K. <prabsma...@gmail.com> wrote:

> Hi Dan,
>
> Sorry for the late response.
>
> I agreed with you for the use cases that you mentioned.
>
> Advice me and please share if there is any sample code to join two data
> sets in Beam that are sharing some common keys.
>
> Regards,
> Prabeesh K.
>
> On 6 February 2017 at 10:38, Dan Halperin <dhalp...@google.com> wrote:
>
>> Definitely, using BigQuery for what BigQuery is really good at (big scans
>> and cost-based joins) is nearly always a good idea. A strong endorsement of
>> Ankur's answer.
>>
>> Pushing the right amount of work into a database is an art, however --
>> there are some scenarios where you'd rather scan in BQ and join in Beam
>> because the join result is very large and you can better filter it in Beam,
>> or because you need to do some pre-join-filtering based on an external API
>> call (and you don't want to load the results of that API call into
>> BigQuery)...
>>
>> I've only seen a few, rare, cases of the latter.
>>
>> Thanks,
>> Dan
>>
>> On Sun, Feb 5, 2017 at 9:19 PM, Prabeesh K. <prabsma...@gmail.com> wrote:
>>
>>> Hi Ankur,
>>>
>>> Thank you for your response.
>>>
>>> On 5 February 2017 at 23:59, Ankur Chauhan <an...@malloc64.com> wrote:
>>>
>>>> I have found doing joins in bigquery using sql is a lot faster and
>>>> easier to iterate upon.
>>>>
>>>>
>>>> Ankur Chauhan
>>>> On Sat, Feb 4, 2017 at 22:05 Prabeesh K. <m...@prabeeshk.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Which is the better way to join two tables in apache beam?
>>>>>
>>>>> Regards,
>>>>> Prabeesh K.
>>>>>
>>>>
>>>
>>
>


Re: Using R in Data Flow

2017-04-12 Thread Dan Halperin
Hi Anant,

The blog post about R-on-Dataflow should work for R-on-Beam -- it just
predates Beam; there is no longer any Dataflow Python that isn't based on
Beam :)

What have you tried?

Thanks,
Dan

On Mon, Apr 10, 2017 at 11:18 PM, Anant Bhandarkar <
anant.bhandar...@impactanalytics.co> wrote:

> Hi,
> We are using Big Query for our querying needs.
> We are also looking to use Dataflow with some of the statistical
> libraries. We are using R libraries to build these statistical models.
>
> We are looking to run our data through the statistical models such as ELM
> , GAM, ARIMA etc. We see that python doesn't have all these libraries which
> we get as Cran packages in R.
>
> We have seen this example where there is a possibility to run R on data
> flow.
>
>
> https://medium.com/google-cloud/cloud-dataflow-can-autoscale
> -r-programs-for-massively-parallel-data-processing-492b57bd732d
> https://github.com/gregmcinnes/incubator-beam/blob/python-
> sdk/sdks/python/apache_beam/examples/complete/wordlength/
> wordlength_R/wordlength_R.py
>
> If we are able to use parallelization provided by Dataflow along with R
> libraries this would be a great for us as a team and also the whole Data
> science community which relies on R Packages.
>
> We would need some help from the Beam to achieve this.
>
> I see that it will be a very good use case for the whole of data science
> community that will enable usage of both Python and R on Beam and Dataflow.
>
> Regards,
> Anant
>
>


Re: UserCodeExecption

2017-04-12 Thread Dan Halperin
Hi Anil,

It looks to me like you are not using Apache Beam -- e.g., I
see com.google.cloud.dataflow.sdk.util.UserCodeException which is not a
Beam class. Additionally, the documentation on cloud.google.com is not for
Apache Beam -- for that, see https://beam.apache.org -- e.g., the Beam Java
Quickstart .

Can you please try with Apache Beam 0.6.0 release? And make sure that your
pipeline works in the DirectRunner before trying it on any of the other
runners, including DataflowRunner.

Thanks,
Dan

On Wed, Apr 12, 2017 at 5:07 AM, Anil Srinivas  wrote:

> Hi,
>  Yes I have activated the Dataflow api and it is getting executed in the
> local. The problem is when I try to execute on the Cloud Dataflow service.
>
>
>
> Thanks and Regards
> Anil
>
> On Apr 12, 2017 17:33, "Alexandre Crayssac" 
> wrote:
>
>> Hi,
>>
>> Have you activated the Dataflow API in the GCP console ?
>>
>> Regards,
>>
>> Alexandre
>>
>> On Wed, Apr 12, 2017 at 11:52 AM, Anil Srinivas <
>> anil.b...@impactanalytics.co> wrote:
>>
>>> Hi,
>>> I have been trying to execute the example word-count on the cloud
>>> dataflow service as per the documentation in https://cloud.google.com/da
>>> taflow/docs/quickstarts/quickstart-java-maven. Even after following the
>>> exact same steps as mentioned in the document, I am getting the following
>>> error:
>>>
>>> Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException:
>>> java.lang.RuntimeException: java.lang.RuntimeException:
>>> com.google.cloud.dataflow.sdk.util.UserCodeException:
>>> java.io.IOException: com.google.api.client.googleap
>>> is.json.GoogleJsonResponseException: 400 Bad Request
>>> {
>>>   "code" : 400,
>>>   "errors" : [ {
>>> "domain" : "global",
>>> "message" : "No object name",
>>> "reason" : "required"
>>>   } ],
>>>   "message" : "No object name"
>>> }
>>> at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(Us
>>> erCodeException.java:35)
>>> at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(
>>> UserCodeException.java:40)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCo
>>> deException(DoFnRunnerBase.java:369)
>>> at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokePr
>>> ocessElement(SimpleDoFnRunner.java:51)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processEle
>>> ment(DoFnRunnerBase.java:139)
>>> at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.p
>>> rocessElement(SimpleParDoFn.java:188)
>>> at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDo
>>> Fn.processElement(ForwardingParDoFn.java:42)
>>> at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerL
>>> oggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
>>> at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperat
>>> ion.process(ParDoOperation.java:55)
>>> at com.google.cloud.dataflow.sdk.util.common.worker.OutputRecei
>>> ver.process(OutputReceiver.java:52)
>>> at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1
>>> .output(SimpleParDoFn.java:158)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContex
>>> t.outputWindowedValue(DoFnRunnerBase.java:288)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProces
>>> sContext.output(DoFnRunnerBase.java:450)
>>> at com.google.cloud.dataflow.sdk.runners.worker.CombineValuesFn
>>> Factory$ExtractOutputDoFn.processElement(CombineValuesFnFact
>>> ory.java:270)
>>> at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokePr
>>> ocessElement(SimpleDoFnRunner.java:49)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processEle
>>> ment(DoFnRunnerBase.java:139)
>>> at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.p
>>> rocessElement(SimpleParDoFn.java:188)
>>> at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDo
>>> Fn.processElement(ForwardingParDoFn.java:42)
>>> at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerL
>>> oggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
>>> at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperat
>>> ion.process(ParDoOperation.java:55)
>>> at com.google.cloud.dataflow.sdk.util.common.worker.OutputRecei
>>> ver.process(OutputReceiver.java:52)
>>> at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1
>>> .output(SimpleParDoFn.java:158)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContex
>>> t.outputWindowedValue(DoFnRunnerBase.java:288)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContex
>>> t.outputWindowedValue(DoFnRunnerBase.java:284)
>>> at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProces
>>> sContext$1.outputWindowedValue(DoFnRunnerBase.java:508)
>>> at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndComb
>>> ineDoFn.closeWindow(GroupAlsoByWindowsAndCombineDoFn.java:203)
>>> at 

Re: BigQueryIO - Why is CREATE_NEVER not supported when using a tablespec?

2017-04-07 Thread Dan Halperin
Hi Josh,
You raise a good point. I think we had put this check in (long before
partition tables existed) because we need schema to create a table and we
assumed the number of tables would be unbounded. But now it's an outdated
check, overly conservative, and probably should be removed.

Would you like to send a PR to fix this?

Thanks,
Dan


On Fri, Apr 7, 2017 at 10:03 AM, Josh  wrote:

> Hi all,
>
> I have a use case where I want to stream into BigQuery, using a tablespec
> but with CreateDisposition.CREATE_NEVER.I want to partition/shard my data
> by date, and use BigQuery's date partitioning feature within a single table
> (rather than creating a new BigQuery table for every day). In this case
> writes would be made to a partition in a single table, e.g.
> `my-project:dataset.my_table$20170407`, and in my tablespec I would just
> be choosing the partition decorator using the window.
>
> Unfortunately this doesn't seem possible with BigQueryIO at the moment,
> because it requires me to use CreateDisposition.CREATE_IF_NEEDED. I can't
> use CreateDisposition.CREATE_IF_NEEDED because it requires me to provide
> a table schema and my BigQuery schema isn't available at compile time.
>
> Is there any good reason why CREATE_NEVER is not allowed when using a
> tablespec?
>
> Thanks,
> Josh
>


Re: Reading/ writing xml file hangs indefinitely

2017-04-07 Thread Dan Halperin
Hi Richard,

I wonder if you're being hit by
https://issues.apache.org/jira/browse/BEAM-1309 -- namely, that the entire
/tmp directory might be being traversed.

As a sanity check, can you try moving your test file into a more specific
folder, like /tmp/beam/test_input/input.xml

If this resolves your issue, it's a good argument for prioritizing fixing
that issue ;)

Dan

On Fri, Apr 7, 2017 at 5:37 AM, Richard Hanson <rhan...@mailbox.org> wrote:

>
> On 06 April 2017 at 19:53 Dan Halperin <dhalp...@google.com> wrote:
>
> Hi Richard,
>
> Can you share a little more info about your environment? Here's a
> smattering of questions for which answers may be useful.
>
> * What runner are you using?
>
> I don't specify any runner so I believe it should be use direct runner.
>
> * What version of the SDK?
>
> Apache Beam version is 0.6.0 (beam-sdks-java-core
> and beam-runners-direct-java)
>
> * Does this reproduce in the DirectRunner?
>
> This problem I believe happens while running DirectRunner.
>
> * Can you share a full reproduction? (e.g., in a github gist)?
>
> JDK: 1.8.0_121
>
> Scala: 2.12.1
>
> sbt: 0.13.13
>
>
> Below is the sample xml file
>
> 
> 
> 
> 33
> John Smith
> 
> 
>
>
> The sample record object.
>
> @XmlRootElement
> class Customer {
>
> private var name: String = ""
>
> private var age: Int = 0
>
> private var id: Int = -1
>
> def getName():String = name
>
> @XmlElement
> def setName(name: String) = this.name = name
>
> def getAge(): Int = age
>
> @XmlElement
> def setAge(age: Int) = this.age = age
>
> def getId(): Int = id
>
> @XmlAttribute
> def setId(id: Int) = this.id = id
>
> }
>
> Pipeline procedure of the code.
>
>
> val options = PipelineOptionsFactory.create
> val p = Pipeline.create(options)
>
> val source = XmlSource.from[Customer](
> new File("customers.xml").toPath.toString
> ).withRootElement("customers").withRecordElement("customer").
> withRecordClass(classOf[Customer])
>
> val sink = XmlSink.write().toFilenamePrefix("xmlout").
> ofRecordClass(classOf[Customer]).
> withRootElement("customers")
>
> p.apply(Read.from(source)).apply(Write.to(sink))
>
> p.run.waitUntilFinish
>
>
> * What is happening on the machine(s) executing the job? Is there high
> CPU? Is the disk active? Etc.
>
> There is a high cpu usage which keeps at 99.x% when Java process is
> executing (when checking with top command).
>
> 7624 user  20   0 2837.6m 582.5m  23.5m S 99.3 11.4   2:42.11 java
>
> Monitoring with iotop shows disk io are (mostly) often performed by system
> processes e.g. kworker. Only seeing once or twice Java process (the only
> user process that runs on the machine) is doing disk io.
>
> Total DISK READ : 0.00 B/s | Total DISK WRITE : 0.00 B/s
> Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s
>
>
> TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
> 7720 be/4 root 0.00 B/s 0.00 B/s 0.00 % 0.01 % [kworker/0:2]
>
>
> Total DISK READ : 0.00 B/s | Total DISK WRITE : 15.62 K/s
> Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s
> TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
>
> 7626 be/4 user 0.00 B/s 11.72 K/s 0.00 % 0.00 % java -Xms~h.jar test
>
> 7633 be/4 user 0.00 B/s 3.91 K/s 0.00 % 0.00 % java -Xms~h.jar test
>
>
> Thanks,
> Dan
>
> On Tue, Apr 4, 2017 at 9:33 AM, Richard Hanson <rhan...@mailbox.org>
> wrote:
>
> I am testing apache beam to read/ write xml files. But I encounter a
> problem that even the code is just to read a single xml file and write it
> out without doing any transformation, the process seems to hang
> indefinitely. The output looks like below:
>
> [pool-2-thread-5-ScalaTest-running-XmlSpec] INFO org.apache.beam.sdk.io
> .FileBasedSource - Matched 1 files for pattern /tmp/input/my.xml
> [pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing write
> operation org.apache.beam.sdk.io.XmlSink$XmlWriteOperation@1c72df2c
>
>
> The code basically do the following:
>
> val options = PipelineOptionsFactory.create
> val p = Pipeline.create(options)
>
>
> val xml = XmlSource.from[Record](new File("/tmp/input/my.xml").toPa
> th.toString).withRootElement("RootE").withRecordElement("
> Record").withRecordClass(classOf[Record])
>
> p.apply(Read.from(xml)).apply(Write.to(XmlSink.write().toFil
> enamePrefix("xml").ofRecordClass(classOf[Record]).
> withRootElement("RootE")))
>
> p.run.waitUntilFinish
>
>
> What part may be missing in my program?
>
> Thanks
>
>
>


Re: automatic runner inference

2017-04-03 Thread Dan Halperin
+Thomas

I thought, perhaps erroneously, that if there is only 1 runner on the
classpath we did this already. Though of course it wouldn't work in almost
any case -- most runners require extra command line flags.

On Mon, Apr 3, 2017 at 12:33 PM, Antony Mayi  wrote:

> Currently I need to compile my pipeline with options.setRunner(XY.class).
> is it not possible to infer the runner implicitly? ie. ultimately having
> one jar file that I can submit to any platform and it would run (currently
> I need to recompile or specify some switches before running it on different
> engine). Some kind of platform detection with potential fallback to direct
> runner.
>
> thanks,
> antony.
>


Re: Beam partitioned file reading and writing

2017-03-29 Thread Dan Halperin
Hi Billy,

Thanks for that feedback -- truly invaluable.

Would you mind filing a bug in the Flink runner component in JIRA with this
information? There may be a simple fix :)

On Wed, Mar 29, 2017 at 8:06 AM, Newport, Billy 
wrote:

> We tried that Robert on flink and it’s pretty awful performance wise. We
> doubled the speed by using a new hadoopoutputformat which basically has N
> hadoopoutputformats in side it and the writeRecord looks at the enum and
> writes the record to the appropriate “inner” output format.
>
>
>
> Most of the performance problems we’ve had so far are all forcing us to
> kind of hack the data flows in unnatural ways to avoid serialization costs.
>
>
>
> For example, read a file and split in to two datasets based on a column
> predicate. Approach 1:
>
>
>
> READ FILE -> Filter 1 -> Dataset
>
> è Filter 2 -> Dataset
>
>
>
> Is slower that:
>
>
>
> READ FILE -> Filter 1 -> Dataset
>
> READ FILE -> Filter 2 -> Dataset
>
>
>
> Read the file twice is faster! Go figure.
>
>
>
>
>
>
>
> *From:* Robert Bradshaw [mailto:rober...@google.com]
> *Sent:* Tuesday, March 28, 2017 4:25 AM
> *To:* user@beam.apache.org
> *Subject:* Re: Beam partitioned file reading and writing
>
>
>
> I would simply use a Partition transform to convert the
> PCollection> data to a list
> of PCollection and then write each one as desired. Either
> that or a DoFn with multiple side outputs (one for each enum value).
> There's no reason this should perform poorly on the runners I am aware of,
> and is the most straightforward (and least error-prone) solution. (I don't
> know what your original solution looked like, but if you used N filters
> rather than one DoFn with N outputs that would perform much worse.)
>
>
>
> https://beam.apache.org/documentation/programming-
> guide/#transforms-flatten-partition
> 
>
>
>
> On Tue, Mar 28, 2017 at 1:04 AM, Narek Amirbekian 
> wrote:
>
> We had to implement a custom sink in order to get partitioned destination.
> This is the least hacky solution, but the down-side of this approach is and
> it is a lot of code that custom sinks only work in batch mode. If you need
> to do this in streaming mode you may be able to get away with writing as a
> side-effect of a pardo, but you have to account for concurrent writes, and
> chunks might be processed more than once so you have to account for
> duplicate values in your output.
>
>
>
> On Thu, Mar 23, 2017 at 9:46 AM Newport, Billy 
> wrote:
>
> Is there builtin support for writing partitioned Collections. For example:
>
>
>
> PCollection> data;
>
>
>
> We want to write the GenericRecords in data in to different files based on
> the enum. We’ve did this in flink by making a proxy hadoopoutputformat
> which has the N real outputformats and the write method checks the enum and
> forwards the write call for the genericrecord to the correct outputformat.
>
>
>
> Given the lack of beam parquet support, the final writer we want to use
> with beam is Avro.
>
>
>
> We used the proxy outputformat trick in flink because performance was very
> poor using a filter to split it and then a map to convert from
> Enum,GenericRecord to just GenericRecord.
>
>
>
> I’m nervous to use side outputs in beam given I think they will be
> implemented as described here which performs poorly.
>
>
>
> So basically, has anyone implemented a demuxing AvroIO.Writer?
>
>
>
> Thanks
>
>
>


Re: Kafka Offset handling for Restart/failure scenarios.

2017-03-21 Thread Dan Halperin
[We should keep user list involved if that's where the discussion
originally was :)]

Jins George's original question was a good one. The right way to resume
from the previous offset here is what we're already doing – use the
KafkaCheckpointMark. In Beam, the runner maintains the state and not the
external system. Beam runners are responsible for maintaining the
checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If
a user disables checkpointing, then they are explicitly opting into "redo
all work" on restart.

--> If checkpointing is enabled but the KafkaCheckpointMark is not being
provided, then I'm inclined to agree with Amit that there may simply be a
bug in the FlinkRunner. (+aljoscha)

For what Mingmin Xu asked about: presumably if the Kafka source is
initially configured to "read from latest offset", when it restarts with no
checkpoint this will automatically go find the latest offset. That would
mimic at-most-once semantics in a buggy runner that did not provide
checkpointing.

Dan

On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <mingm...@gmail.com> wrote:

> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory.
> Can it restore during job restart? --Not test the runner in streaming for
> some time.
>
> Regarding to data-completeness, I would use at-most-once when few data
> missing(mostly tasknode failure) is tolerated, compared to the performance
> cost introduced by 'state'/'checkpoint'.
>
> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
> > On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <mingm...@gmail.com> wrote:
> >
> > > Move discuss to dev-list
> > >
> > > Savepoint in Flink, also checkpoint in Spark, should be good enough to
> > > handle this case.
> > >
> > > When people don't enable these features, for example only need
> > at-most-once
> > >
> > The Spark runner forces checkpointing on any streaming (Beam)
> application,
> > mostly because it uses mapWithState for reading from UnboundedSource and
> > updateStateByKey form GroupByKey - so by design, Spark runner is
> > at-least-once. Generally, I always thought that applications that require
> > at-most-once are more focused on processing time only, as they only care
> > about whatever get's ingested into the pipeline at a specific time and
> > don't care (up to the point of losing data) about correctness.
> > I would be happy to hear more about your use case.
> >
> > > semantic, each unbounded IO should try its best to restore from last
> > > offset, although CheckpointMark is null. Any ideas?
> > >
> > > Mingmin
> > >
> > > On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <dhalp...@apache.org>
> > wrote:
> > >
> > > > hey,
> > > >
> > > > The native Beam UnboundedSource API supports resuming from checkpoint
> > --
> > > > that specifically happens here
> > > > <
> > > https://github.com/apache/beam/blob/master/sdks/java/io/kafk
> > a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
> > > when
> > > > the KafkaCheckpointMark is non-null.
> > > >
> > > > The FlinkRunner should be providing the KafkaCheckpointMark from the
> > most
> > > > recent savepoint upon restore.
> > > >
> > > > There shouldn't be any "special" Flink runner support needed, nor is
> > the
> > > > State API involved.
> > > >
> > > > Dan
> > > >
> > > > On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
> j...@nanthrax.net
> > >
> > > > wrote:
> > > >
> > > >> Would not it be Flink runner specific ?
> > > >>
> > > >> Maybe the State API could do the same in a runner agnostic way (just
> > > >> thinking loud) ?
> > > >>
> > > >> Regards
> > > >> JB
> > > >>
> > > >> On 03/21/2017 04:56 PM, Mingmin Xu wrote:
> > > >>
> > > >>> From KafkaIO itself, looks like it either start_from_beginning or
> > > >>> start_from_latest. It's designed to leverage
> > > >>> `UnboundedSource.CheckpointMark`
> > > >>> during initialization, but so far I don't see it's provided by
> > runners.
> > > >>> At the
> > > >>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775
> > > >>> <https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it
> in
&g

Re: Implicit file-size limit of input files?

2017-02-11 Thread Dan Halperin
Hi Tobias,

There should be no specific limitations in Beam on file size or otherwise,
obviously different runners and different size clusters will have different
potential scalability.

A few general Beam tips:

* Reading from compressed files is often a bottleneck, as this work is not
parallelizable. If you find reading from compressed files is a bottleneck,
you may want to follow it with a shuffling operation to improve parallelism
as most runners can run the work pre- and post-shuffle on different
machines (with different scaling levels).

* The Partition operator on its own does not improve parallelism. Depending
on how the runner arranges the graph, when you partition N ways you may
still execute all N partitions on the same machine. Again, a shuffling
operator here will often let runners to execute the N branches separately.

   (There are known issues for certain sinks when N is high. For example,
by default writing to GCS uses a 64 MiB buffer so if you have 10 partitions
you're allocating 640 MiB, per core, just for those network buffers.)

It sounds like you may be trying to use the "to(Partition function)" method
of writing per window tables. The javadoc for BigQueryIO.Write clearly
documents (
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L232)
that it is not likely to work in "batch" runners.

I suggest reaching out to Google Cloud via the recommendations at
https://cloud.google.com/dataflow/support if you have issues specific to
the Google Cloud Dataflow runner.

Dan

On Fri, Feb 10, 2017 at 3:18 AM, Tobias Feldhaus <
tobias.feldh...@localsearch.ch> wrote:

> Addendum: When running in streaming mode with version 0.5 of the SDK,
> the elements are basically stuck before getting emitted [0], but the whole
> process starts and is running up to a point when most likely the memory is
> full (GC overhead error) and it crashes [0].
>
> It seems like the Reshuffle that is taking place prevents any output to
> happen.
> To get rid of that, I would need to find another way to write to a
> partition in
> BigQuery in batch mode without using the workaround that is described here
> [1],
> but I don't know how.
>
> [0] https://puu.sh/tWInq/f41beae65b.png
> [1] http://stackoverflow.com/questions/38114306/creating-
> writing-to-parititoned-bigquery-table-via-google-cloud-dataflow/40863609#
> 40863609
>
> On 10.02.17, 10:34, "Tobias Feldhaus" 
> wrote:
>
> Hi,
>
> I am currently facing a problem with a relatively simple pipeline [0]
> that is
> reading gzipped JSON files on Google Cloud Storage (GCS), adding a
> timestamp,
> and pushing it into BigQuery. The only special thing I am doing as
> well is
> partitioning it via a PartioningWindowFn that is assigning a partition
> for each element as described here [1].
>
> The pipeline works locally and remotely on the Google Cloud Dataflow
> Service
> (GCDS) with smaller test files, but if I run it on the about 100 real
> ones with
> 2GB each it breaks down in streaming and batch mode with different
> errors.
>
> The pipeline runs in batch mode, but in the end it gets stuck with
> processing only
> 1000-5000 streaming inserts per second to BQ, while constantly scaling
> up the
> number of instances [2]. As you can see in the screenshot the shuffle
> never
> started, before I had to stop it to cut the costs.
>
> If run in streaming mode, the pipeline creation fails because of a
> resource
> allocation failure (Step setup_resource_disks_harness19: Set up of
> resource
> disks_harness failed: Unable to create data disk(s): One or more
> operations
> had an error: [QUOTA_EXCEEDED] 'Quota 'DISKS_TOTAL_GB' exceeded.
> Limit: 8.0) This means, it has requested more than 80 (!) TB for
> the job that
> operates on 200 GB compressed (or 2 TB uncompressed) files.
>
> I’ve tried to run it with instances that are as large as n1-highmem-16
> (104 GB memory each) and 1200 GB local storage.
>
> I know this is a mailing list of Apache Beam and not intended for GCDF
> support,
> my question is therefore if anyone has faced the issue with the SDK
> before, or
> if there is a known size limit for files.
>
>
> Thanks,
> Tobias
>
> [0] https://gist.github.com/james-woods/98901f7ef2b405a7e58760057c4816
> 2f
> [1] http://stackoverflow.com/a/40863609/5497956
> [2] https://puu.sh/tWzkh/49b99477e3.png
>
>
>
>