Re: Performance and Cost benchmarking

2022-09-26 Thread Andrew Pilloud via dev
Hi Pranav,

I left some comments on your design. Your doc discusses a bunch of
details about infrastructure such as testing frameworks, automation,
and performance databases, but doesn't describe how it will fit in
with our existing infrastructure (Load Tests, Nexmark, Jenkins,
InfluxDB, Grafina). I would suspect we actually have most of the
infrastructure already built?

What I didn't see (and expected to see) was details on how the tests
would actually interact with IOs. Will there be a generic Schema IO
test harness or do you plan to write one for each IO? Will you be
comparing different data types (data stored as byte[] vs more complex
structures)? What about different IO specific optimization (data
sharding, pushdown)?

Andrew

On Mon, Sep 26, 2022 at 9:07 AM Pranav Bhandari
 wrote:
>
> Hello,
>
> Hope this email finds you well. I have attached a link to a doc which 
> discusses the design for a performance and cost benchmarking framework to be 
> used by Beam IOs and Google-provided dataflow templates.
>
> Please feel free to comment on the doc with any questions, concerns or ideas 
> you might have.
>
> Thank you,
> Pranav Bhandari
>
>
> https://docs.google.com/document/d/14GatBilwuR4jJGb-ZNpYeuB-KkVmDvEm/edit?usp=sharing=102139643796739130048=true=true


Re: [idea] A new IO connector named DataLakeIO, which support to connect Beam and data lake, such as Delta Lake, Apache Hudi, Apache iceberg.

2022-09-26 Thread Sachin Agarwal via dev
It turns out there was a commit submitted here!
https://github.com/nanhu-lab/beam/commit/d4f5fa4c41602b4696737929dd1bdd5ae2302a65

Related GH issue: https://github.com/apache/beam/issues/23074

On Tue, Aug 30, 2022 at 10:28 AM Sachin Agarwal  wrote:

> I would posit that something is better than nothing - did we ever see that
> generic implementation?
>
> On Tue, Aug 30, 2022 at 10:22 AM Austin Bennett <
> whatwouldausti...@gmail.com> wrote:
>
>> Is there enough commonality across Delta, Hudi, Iceberg for this generic
>> solution?  I imagined we'd potentially have individual IOs for each.  A
>> generic one seems possible, but certainly would like to learn more.
>>
>> Also, are others in the community working on connectors for ANY of those
>> Delta Lake, Hudi, or Iceberg IOs?  Would hope for some form of coordination
>> and/or at least awareness between people addressing
>> complementary/overlapping areas.
>>
>> On Mon, Aug 29, 2022 at 4:15 PM Neil Kolban via dev 
>> wrote:
>>
>>> Howdy,
>>> I have a client who would be interested to use this.  Is there a link to
>>> a GitHub repo or other place I can read more?
>>>
>>> Neil  (kol...@google.com)
>>>
>>> On 2022/08/05 07:23:31 张涛 wrote:
>>> >
>>> > Hi, we developed a new IO connector named DataLakeIO, to connect Beam
>>> and data lake, such as Delta Lake, Apache Hudi, Apache iceberg. Beam can
>>> use DataLakeIO to read data from data lake, and write data to data lake. We
>>> did not find data lake IO on
>>> https://beam.apache.org/documentation/io/built-in/, we want to
>>> contribute this new IO connector to Beam, what should we do next? Thank you
>>> very much!
>>>
>>


Performance and Cost benchmarking

2022-09-26 Thread Pranav Bhandari
Hello,

Hope this email finds you well. I have attached a link to a doc which
discusses the design for a performance and cost benchmarking framework to
be used by Beam IOs and Google-provided dataflow templates.

Please feel free to comment on the doc with any questions, concerns or
ideas you might have.

Thank you,
Pranav Bhandari


https://docs.google.com/document/d/14GatBilwuR4jJGb-ZNpYeuB-KkVmDvEm/edit?usp=sharing=102139643796739130048=true=true


Beam High Priority Issue Report (71)

2022-09-26 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/23350 [Bug]: 
apache_beam.io.gcp.bigquery_file_loads_test.BigQueryFileLoadsIT.test_bqfl_streaming
 - failing test
https://github.com/apache/beam/issues/23306 [Bug]: BigQueryBatchFileLoads in 
python loses data when using WRITE_TRUNCATE
https://github.com/apache/beam/issues/23179 [Bug]: Parquet size exploded for no 
apparent reason
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/22303 [Task]: Add tests to Kafka SDF and 
fix known and discovered issues
https://github.com/apache/beam/issues/22299 [Bug]: JDBCIO Write freeze at 
getConnection() in WriteFn
https://github.com/apache/beam/issues/21794 Dataflow runner creates a new timer 
whenever the output timestamp is change
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21704 beam_PostCommit_Java_DataflowV2 
failures parent bug
https://github.com/apache/beam/issues/21701 beam_PostCommit_Java_DataflowV1 
failing with a variety of flakes and errors
https://github.com/apache/beam/issues/21700 
--dataflowServiceOptions=use_runner_v2 is broken
https://github.com/apache/beam/issues/21696 Flink Tests failure :  
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.runners.core.construction.SerializablePipelineOptions 
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21480 flake: 
FlinkRunnerTest.testEnsureStdoutStdErrIsRestored
https://github.com/apache/beam/issues/21472 Dataflow streaming tests failing 
new AfterSynchronizedProcessingTime test
https://github.com/apache/beam/issues/21471 Flakes: Failed to load cache entry
https://github.com/apache/beam/issues/21470 Test flake: test_split_half_sdf
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21468 
beam_PostCommit_Python_Examples_Dataflow failing
https://github.com/apache/beam/issues/21467 GBK and CoGBK streaming Java load 
tests failing
https://github.com/apache/beam/issues/21463 NPE in Flink Portable 
ValidatesRunner streaming suite
https://github.com/apache/beam/issues/21462 Flake in 
org.apache.beam.sdk.io.mqtt.MqttIOTest.testReadObject: Address already in use
https://github.com/apache/beam/issues/21271 pubsublite.ReadWriteIT flaky in 
beam_PostCommit_Java_DataflowV2  
https://github.com/apache/beam/issues/21270 
org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testWindowedCombineGloballyAsSingletonView
 flaky on Dataflow Runner V2
https://github.com/apache/beam/issues/21266 
org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
 is flaky in Java ValidatesRunner Flink suite.
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21261 
org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer
 is flaky
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21257 Either Create or DirectRunner fails 
to produce all elements to the following transform
https://github.com/apache/beam/issues/21123 Multiple jobs running on Flink 
session cluster reuse the persistent Python environment.
https://github.com/apache/beam/issues/21121 
apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it
 flakey
https://github.com/apache/beam/issues/21118 
PortableRunnerTestWithExternalEnv.test_pardo_timers flaky
https://github.com/apache/beam/issues/21114 Already Exists: Dataset 
apache-beam-testing:python_bq_file_loads_NNN
https://github.com/apache/beam/issues/21113 
testTwoTimersSettingEachOtherWithCreateAsInputBounded flaky
https://github.com/apache/beam/issues/2 Java creates an incorrect pipeline 
proto when core-construction-java jar is not in the CLASSPATH
https://github.com/apache/beam/issues/20981 Python precommit flaky: Failed to 
read inputs in the data plane
https://github.com/apache/beam/issues/20977 SamzaStoreStateInternalsTest is 
flaky
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20975 
org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: 
false] 

Re: [JmsIO] => Pull Request to fix message acknowledgement issue

2022-09-26 Thread BALLADA Vincent via dev

De : Luke Cwik 
Date : jeudi, 8 septembre 2022 à 19:17
À : BALLADA Vincent 
Cc : dev@beam.apache.org 
Objet : Re: [JmsIO] => Pull Request to fix message acknowledgement issue
[vwP6KQExYeP8ewASUVORK5CYII=]

[EXT]
Could we have more than one active checkpoint per reader instance?
Yes. Readers are saved and reused across multiple bundles. They aren't always 
closed at bundle boundaries.

Are we sure that all checkpoints are finalized when the reader is closed?
No, readers are closed after a certain period of time of inactivity. It is 
likely that all checkpoints will have expired or been finalized but it is not 
guaranteed by when the reader is closed for example in multi language pipelines 
the downstream processing in another language can delay committing the output 
to the runner which can lead to the readers being closed due to inactivity and 
then the checkpoint being finalized.

We could choose to hand off the session ownership to the JmsCheckpoint and 
create a new one. This way finalizing the checkpoint would own closing the 
session.




On Thu, Sep 8, 2022 at 8:01 AM BALLADA Vincent 
mailto:vincent.ball...@renault.com>> wrote:
Hello Luke,

Thanks for your remarks.

Connection reuse
Concerning the use of a single connection fort the entire process per 
connection factory, that would mean that we would have one JMS connection per 
worker, and there may be a downside to do so:
If the broker is hosted into a multi-node cluster infrastructure, and if we 
want to consumer messages from all cluster nodes, we have to make sure that we 
have enough connections to be load balanced to all the nodes.
If for some reason (autoscaling, low backlog size) we have only one worker, we 
may not consume from all the cluster nodes.
As the number of connections is limited by the number of split/Readers, and as 
connections are opened/closed not so often (when workers are killed or created, 
or reader closes/started), I would suggest to keep the connection management as 
it is currently.

Session and consumer lifecycle


  1.  Session unique per checkpoint
Could we have more than one active checkpoint per reader instance?

Should we close the session/consumer and create new session/consumer at the end 
of finalizeCheckpoint? The goal here is to ensure that the message 
acknowledgement occurs before the session is closed.
If advance and finalizeCheckpoint can be called concurrently, we need to make 
sure that the session is active in “advance” in order to receive message.
Are we sure that all checkpoints are finalized when the reader is closed?


  1.  Session scoped to the reader start/close
It seems to be more or less the case currently.

Regards

Vincent BALLADA


De : Luke Cwik via dev mailto:dev@beam.apache.org>>
Date : jeudi, 1 septembre 2022 à 18:48
À : dev mailto:dev@beam.apache.org>>
Objet : Re: [JmsIO] => Pull Request to fix message acknowledgement issue
[vwP6KQExYeP8ewASUVORK5CYII=]

[EXT]
I have a better understanding of the problem after reviewing the doc and we 
need to decide on what lifecycle scope we want the `Connection`, `Session`, and 
`MessageConsumer` to have.

It looks like for the `Connection` we should try to have at most one instance 
for the entire process per connection factory. 
https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html says that the 
connection should be re-used. Having less connections would likely be 
beneficial unless you think there would be a performance limitation of using a 
single connection per process for all the messages?

For the `Session` and `MessageConsumer`, I'm not sure what lifecycle scope it 
should have. Some ideas:
1. we could make it so that each `Session` is unique per checkpoint, e.g. we 
hand off the ownership of the `Session` to the JmsCheckpointMark everytime we 
checkpoint and create a new `Session` for the next set of messages we receive. 
This would mean that we would also close the `MessageConsumer` at every 
checkpoint and create a new one.
2. we could make it so that the `Session` is scoped to the reader start/close 
and possibly multiple checkpoint marks and effectively close the `Session` once 
the reader is closed and all checkpoint marks are finalized/expired. We would 
close the `MessageConsumer` whenever the reader is closed.
3. we could make it so that the `Session` is scoped to the `Connection` and 
would only close it when the `Connection` closes.

1 seems pretty simple since the ownership of the `Session` is always owned by a 
single distinct owner. This seems like it would make the most sense if 
`Session` creation and management was cheap. Another positive is that once the 
`Session` closes any messages that weren't acknowledged are returned back to 
the queue and we will not have to wait for the reader to be closed or all the 
checkpoint marks to be finalized.

What do you think?

On Mon, Aug 29, 2022 at 10:06 PM Jean-Baptiste Onofré 
mailto:j...@nanthrax.net>> wrote:
Hi Vincent,

thanks, I will take a look (as