Re: kafkaIO Consumer Rebalance with Spark Runner

2019-01-25 Thread Raghu Angadi
You have 32 partitions. Reading can not be distributed to more than 32
parallel tasks.
If you have a log of processing for each record after reading, you can
reshuffle the messages before processing them, that way the processing
could be distributed to more tasks. Search for previous threads about
reshuffle in Beam lists.

On Thu, Jan 24, 2019 at 7:23 PM  wrote:

> Dear all,
>
>
>
> I am using the kafkaIO sdk in my project (Beam with Spark runner).
>
>
>
> The problem about task skew is shown as the following figure.
>
>
>
> My running environment is:
>
> OS: Ubuntn 14.04.4 LTS
>
> The version of related tools is:
>
> java version: "1.8.0_151"
>
> Beam version: 2.9.0 (Spark runner with Standalone mode)
>
> Spark version: 2.3.1 Standalone mode
>
> Execution condition:
>
> Master/Driver node: ubuntu7
>
> Worker nodes: ubuntu8 (4 Executors); ubuntu9 (4 Executors)
>
> The number of executors is 8
>
>
>
> Kafka Broker: 2.10-0.10.1.1
>
> Broker node at ubuntu7
>
> Kafka Client:
>
> The topic: kafkasink32
>
> kafkasink32 Partitions: 32
>
>
>
> The programming of my project for kafkaIO SDK is as:
>
>
> ==
>
> Map map = ImmutableMap.*builder*()
>
>.put("group.id", (Object)"test-consumer-group")
>
>.build();
>
> List topicPartitions = *new** ArrayList()*;
>
>*for*(*int* i = 0; i < 32; i++) {
>
>  topicPartitions.add(*new* TopicPartition(
> "kafkasink32",i));
>
> }
>
> PCollection> readKafkaData = p.apply(KafkaIO. String>*read*()
>
>  .withBootstrapServers("ubuntu7:9092")
>
>.updateConsumerProperties(map)
>
>.withKeyDeserializer(LongDeserializer.*class*)
>
>.withValueDeserializer(StringDeserializer.*class*)
>
>.withTopicPartitions(topicPartitions)
>
>.withoutMetadata()
>
>);
>
>
> ==
>
> Here I have two directions to solve this problem:
>
>
>
> 1.  Using the following sdk from spark streaming
>
>
> https://jaceklaskowski.gitbooks.io/spark-streaming/spark-streaming-kafka-LocationStrategy.html
>
> LocationStrategies.PreferConsistent: Use in most cases as it consistently
> distributes partitions across all executors.
>
>
>
> If we would like to use this feature, we have not idea to set this in
> kafkaIO SDK.
>
>
>
> 2.  Setting the related configurations of kafka to perform the
> consumer rebalance
>
> set consumer group? Set group.id?
>
>
>
> If we need to do No2., could someone give me some ideas to set
> configurations?
>
>
>
> If anyone provides any direction to help us to overcome this problem, we
> would appreciate it.
>
>
>
> Thanks.
>
>
>
> Sincerely yours,
>
>
>
> Rick
>
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>


How to call Oracle stored proc in JdbcIO

2019-01-25 Thread Rui Wang
Hi Community,

There is a stackoverflow question [1] asking how to call Oracle stored proc
in Beam via JdbcIO. I know very less on JdbcIO and Oracle, so just help ask
here to say if anyone know: does JdbcIO support call stored proc?

If there is no such support, I will create a JIRA for it and reply to the
question.

 -Rui

[1]:
https://stackoverflow.com/questions/54364783/how-to-call-an-oracle-stored-proc-in-apache-beam


Re: ParquetIO write of CSV document data

2019-01-25 Thread Alexey Romanenko
Hi 

Great that this example helped you. 
Also, as I can see, Jeff Klukas already answered your questions in the other 
thread “How to disable sharding with FileIO.write()/FileIO.writeDynamic”.

> On 24 Jan 2019, at 03:44, Sridevi Nookala  
> wrote:
> 
> Hi Alex,
> 
> Thanks for the suggestion. I tried like in the github example by infering 
> AVRO schema,
> 
>  PCollection input =
> pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
>input
> .apply("Produce Avro records", ParDo.of(new 
> DeterministicallyConstructAvroRecordsFn())) 
> .setCoder(AvroCoder.of(SCHEMA))
> .apply(
> "Write Parquet files",
> 
> FileIO.write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
> 
> pipeline.run();
>   }
> 
> have 2 simple questions
> how can i disable sharding with FileIO.write().  i want a single parquet file 
> from a single CSV
>  how can i change the above to have a custom naming for my parquet file
> do i have to use FileIO.writeDynamic() ?
> 
> Hi Lucas,
> 
> I am newbie so not there yet to solve BEAM jira's , but it will help immensly 
> if AVRO scehma inference is avoided
> some thing like python pandas/pyarrow does
> 
> thanks for your help
> Sri
> 
> From: Sridevi Nookala
> Sent: Wednesday, January 23, 2019 9:41:02 PM
> To: user@beam.apache.org
> Subject: Re: ParquetIO write of CSV document data
>  
> Hi Alex,
> 
> Thanks for the suggestion. I tried like in the github example by infering 
> AVRO schema,
> 
>  PCollection input =
> pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
>input
> .apply("Produce Avro records", ParDo.of(new 
> DeterministicallyConstructAvroRecordsFn())) 
> .setCoder(AvroCoder.of(SCHEMA))
> .apply(
> "Write Parquet files",
> 
> FileIO.write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
> 
> pipeline.run();
>   }
> 
> From: Łukasz Gajowy 
> Sent: Tuesday, January 15, 2019 7:02:56 AM
> To: user@beam.apache.org
> Subject: Re: ParquetIO write of CSV document data
>  
> Hi Sri, 
> 
> it's exactly as Alexey says, although there are plans/ideas to improve 
> ParquetIO in a way that would not require defining the schema manually. 
> 
> Some Jiras that might be interesting in this topic but not yet resolved 
> (maybe you are willing to contribute?): 
> https://issues.apache.org/jira/browse/BEAM-4454 
> 
> https://issues.apache.org/jira/browse/BEAM-4812 
> 
> https://issues.apache.org/jira/browse/BEAM-6394 
> 
> 
> Thanks, 
> Łukasz
> 
> pon., 14 sty 2019 o 19:16 Alexey Romanenko  > napisał(a):
> Hi Sri,
> 
> Afaik, you have to create “PCollection" of "GenericRecord”s and define your 
> Avro schema manually to write your data into Parquet files.
> In this case, you will need to create a ParDo for this translation. Also, I 
> expect that your schema is the same for all CSV files.
> 
> Basic example of using Parquet Sink with Java SDK could be found here [1]
> 
> [1] https://git.io/fhcfV 
> 
> 
>> On 14 Jan 2019, at 02:00, Sridevi Nookala > > wrote:
>> 
>> hi,
>> 
>> I have a bunch of CSV data files that i need to store in Parquet format. I 
>> did look at basic documentation on ParquetIO. and ParquetIO.sink() can be 
>> used to achive the same.
>> However there is a dependency on the Avro Schema.
>> how do i infer/generate Avro schema from CSV document data ?
>> Does beam have any API for the same.
>> I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating 
>> avro schema
>> my CSV data files have headers in them and quite a few of the header fields 
>> are hyphenated which are not liked by Kite 's CSVUtil
>> 
>> I think it will be a redundant effort to convert CSV documents to json 
>> documents .
>> Any suggestions on how to infer avro schema from CSV data or a JSON schema 
>> will be helpful
>> 
>> thanks
>> Sri



No checkpoints for KafkaIO to BigQueryIO pipeline on Flink runner?

2019-01-25 Thread Kaymak, Tobias
Hi,

I am trying to migrate my existing KafkaToGCS pipeline to a KafkaToBigQuery
pipeline to skip the loading step from GCS which is currently handled
externally from Beam.

I noticed that the pipeline, written in Beam 2.9.0 (Java) does not trigger
any checkpoint on Flink (1.5.5), even though its configured to do so when I
launch it. Is this normal? How does Beam then guarantee exactly once when
there are no checkpoints in Flink? (It seems to start from scratch when it
crashes, during my tests, but I am not 100% sure)


This is my pipeline:

 pipeline
.apply(
KafkaIO.read()
.withBootstrapServers(bootstrap)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(

ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id",
groupId))

.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withReadCommitted()
.withTimestampPolicyFactory(withEventTs)
.commitOffsetsInFinalize())
.apply(ParDo.of(new ToEventFn()))
.apply(
Window.into(new ZurichTimePartitioningWindowFn())

.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(bundleSize),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(refreshFrequency
.withAllowedLateness(Duration.standardDays(14))
.discardingFiredPanes())
.apply(
BigQueryIO.write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(refreshFrequency)
.withNumFileShards(1)
.to(partitionedTableDynamicDestinations)
.withFormatFunction(
(SerializableFunction)
KafkaToBigQuery::convertUserEventToTableRow)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

pipeline.run().waitUntilFinish();

It's launched like the other (GCS) one via:

...--checkpointingMode=EXACTLY_ONCE --checkpointingInterval=30
--parallelism=1 --tempLocation=gs://foo..

Any idea why checkpointing does not work here?

Best,
Tobias