I'd also like to put the perspective out there that composite transforms are like subroutines; their inner complexity should not concern the end user and probably is the wrong thing to optimize for (assuming there are not other costs, e.g. performance, and of course we shouldn't have unnecessary complexity). This allows, operations like Write (as an example) can be build out of the same kind of simpler pipeline operations rather than having special classes operators sinks (that still need all the complexity (write out temporary shards, move successful ones to a consistent naming, remove temporaries, etc.) but where the structure is less flexibly hard-coded into the system).
Better representations of this hierarchical structure (vs. being forced to look at the entire everything-unrolled-and-inlined view) is what I think we should solve long-term. For word count it's annoying (and surprising) that so much is going on, this becomes especially important for larger pipelines with hundreds or thousands of stages that may have high-level structure but become impenetrable in the flattened, physical view. At the very least, it'd be useful if these UIs had hooks where we could map physical views into the logical views as understood by beam. How best to represent these mappings is as yet an unsolved problem. - Robert On Fri, Nov 30, 2018 at 12:40 PM Maximilian Michels <[email protected]> wrote: > > Hi Akshay, > > I think you're bringing up a very important point. Simplicity with > minimal complexity is something that we strive for. In the case of the > Write transform, the complexity was mainly added due to historical > reasons which Kenneth mentioned. > > It is to note that some Runners don't even benefit from it because they > don't support incremental recovery. I believe we will do work in the > future to simplify the Write transform. > > If you look at other pipelines which don't use that transform you will > find that they are much simpler. > > What can really help is to not expand the composite transforms, but > transforms need to be expanded during translation and collapsing those > transforms again after translation to Spark/Flink can be tricky. > > Generally speaking, we have realized this is an issue and have plans to > fix it, e.g. https://issues.apache.org/jira/browse/BEAM-5859. > > Thanks, > Max > > On 28.11.18 16:52, Kenneth Knowles wrote: > > In fact, that logic in FileIO is "required" to have consistent output > > even just for batch computation, because any step along the way may fail > > and retry. > > > > I put "required" in quotes because there's a legacy concern at play here > > - FileIO is written using the assumption that shuffles are > > checkpointing. What is actually required is that the input to the last > > stage will be the same, even if the whole pipeline goes down and has to > > be brought back up. So the extra shuffle in those pipelines represents a > > necessary checkpoint prior to running the last stage. In the > > saveAsTextFile primitive (caveat: my understanding is vague and stale) > > this would be done in a checkpoint finalization callback, and you have > > to wait for that to complete before consuming the output if you want to > > ensure correctness. > > > > Another relevant piece of information is that Beam has support for two > > things that would make it easier to decipher the UI: > > > > 1. Nodes can have meaningful names. So that would make it obvious which > > part is doing what. > > 2. Transforms can be built as composites of other transforms, and this > > is encouraged. In some UIs, notable Cloud Dataflow, the composites are > > shown as a single box, so it is easier to understand. > > > > I would not expect every engine to adopt all of Beam's features like > > these, but there might be a clever way to make the information available. > > > > Kenn > > > > On Wed, Nov 28, 2018 at 12:27 AM Tim Robertson > > <[email protected] <mailto:[email protected]>> wrote: > > > > Hi Akshay > > > > My understanding is that this all comes from the final FileIO > > write() stage. > > > > When writing, the FileIO puts data into temporary files similar to > > the output formats for Hadoop MapReduce. Once ready to commit, it > > does something along the lines of a directory scan to determine > > which files need to be moved into the final output location. It is > > that directory scan stage that causes the complex DAG and it runs > > very quickly. While it looks complicated, I gather it is necessary > > to support the needs of batch/streaming and in particular the > > behaviour under failure scenarios. > > > > I agree with you that from a developer perspective it is very > > difficult to understand. If you were to replace the FileIO write() > > with e.g. a push into a database (JdbcIO), or ElasticsearchIO or > > SolrIO etc you will see a much more familiar and simpler to > > understand DAG - it might be worth trying that to see. Over time I > > expect you will simply ignore that final job when looking at the DAG > > as you know it is just the output committer stage. > > > > I don't know if you are using HDFS but if so, please be aware of > > BEAM-5036 [1] which is fixed in 2.9.0-SNAPSHOT and will be released > > with 2.9.0 in the coming days. It relates to what I outline above, > > where the files were actually copied into place rather than simply > > moved. On my jobs, I saw a very large increase in performance > > because of this and brought Beam much closer to native spark in > > terms of runtime performance. > > > > I hope this helps, > > Tim > > > > > > [1] https://issues.apache.org/jira/browse/BEAM-5036 > > > > On Wed, Nov 28, 2018 at 7:34 AM Akshay Mendole > > <[email protected] <mailto:[email protected]>> wrote: > > > > Hi, > > We are in a process of evaluating different execution > > engines (mainly apache spark and apache flink) for our > > production batch and streaming pipelines. We thought of using > > apache beam as a unified programming model framework to write > > the pipelines. When we executed simple wordcount pipeline using > > both flink-runner and spark-runner, we saw that the DAG for the > > pipeline in both flink and spark when executed using beam code > > had lot of operators/nodes which cannot be explained. When we > > wrote the same wordcount program using the APIs provided by the > > underlined execution engine, the DAGs were way too simpler and > > could be easily explained. > > Below is an example of wordcount program executed in spark. > > > > This is the DAG when we executed this > > <https://pastebin.com/3MZZPgJk> code developed using spark RDD APIs. > > Screen Shot 2018-11-28 at 11.39.35 AM.png > > > > > > > > > > This is the DAG when we executed this > > <https://pastebin.com/ABtUDmvC> code developed using beam > > pipeline APIs. > > Screen Shot 2018-11-28 at 11.40.04 AM.png Screen Shot 2018-11-28 > > at 11.40.11 AM.png > > > > > > > > We observed *same* *behaviour* when we executed the pipeline > > using flink runner. > > While this is simple word count, we observed when we wrote our > > complex pipelines in beam and executed, they led to DAGs which > > were almost impossible to explain :-( . > > > > We have the following concerns regarding the same > > 1. Is the gigantic DAG expected? > > 2. If so, why so? And will it cause any performance impacts? > > 3. Since the DAG generated cannot be explained, are there better > > ways to understand from developer point of view? > > > > It would be great if someone helps us in this regard. > > > > Thanks, > > Akshay > > > > > > > >
