The problem of kafkaIO sdk for data latency

2018-02-28 Thread linrick
Dear all,

I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).

With using this sdk, there are a situation about data latency, and the 
description of situation is in the following.

The data come from kafak with a fixed speed: 100 data size/ 1 sec.

I create a fixed window within 1 sec without delay. I found that the data size 
is 70, 80, 104, or greater than or equal to 104.

After one day, the data latency happens in my running time, and the data size 
will be only 10 in each window.

In order to clearly explain it, I also provide my code in the following.
" PipelineOptions readOptions = PipelineOptionsFactory.create();
final Pipeline p = Pipeline.create(readOptions);

PCollection>> readData =
  p.apply(KafkaIO.read()
 .withBootstrapServers("127.0.0.1:9092")
 .withTopic("kafkasink")
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(StringDeserializer.class)
 .withoutMetadata())
 .apply(ParDo.of(new DoFn, TimestampedValue>>() {
@ProcessElement
public void test(ProcessContext c) throws ParseException {
String element = c.element().getValue();
try {
  JsonNode arrNode = new ObjectMapper().readTree(element);
  String t = arrNode.path("v").findValue("Timestamp").textValue();
  DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("MM/dd/ HH:mm:ss.");
 LocalDateTime dateTime = LocalDateTime.parse(t, formatter);
 java.time.Instant java_instant = 
dateTime.atZone(ZoneId.systemDefault()).toInstant();
 Instant timestamp  = new Instant(java_instant.toEpochMilli());
  c.output(TimestampedValue.of(c.element(), timestamp));
} catch (JsonGenerationException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
  } catch (IOException e) {
e.printStackTrace();
  }
}}));

PCollection>> readDivideData = 
readData.apply(
  Window.>> 
into(FixedWindows.of(Duration.standardSeconds(1))
  .withOffset(Duration.ZERO))
  .triggering(AfterWatermark.pastEndOfWindow()
 .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
   .plusDelayOf(Duration.ZERO)))
  .withAllowedLateness(Duration.ZERO)
  .discardingFiredPanes());"

In addition, the running result is as shown in the following.
"data-size=104
coming-data-time=2018-02-27 02:00:49.117
window-time=2018-02-27 02:00:49.999

data-size=70
coming-data-time=2018-02-27 02:00:50.318
window-time=2018-02-27 02:00:50.999

data-size=104
coming-data-time=2018-02-27 02:00:51.102
window-time=2018-02-27 02:00:51.999

After one day:
data-size=10
coming-data-time=2018-02-28 02:05:48.217
window-time=2018-03-01 10:35:16.999 "

For repeating my situation, my running environment is:
OS: Ubuntn 14.04.3 LTS

JAVA: JDK 1.7

Beam 2.0.0 (with Direct runner)

Kafka 2.10-0.10.1.1

Maven 3.5.0, in which dependencies are listed in pom.xml:

  org.apache.beam
  beam-sdks-java-core
  2.0.0


   org.apache.beam
  beam-runners-direct-java
  2.0.0
  runtime



org.apache.beam
   beam-sdks-java-io-kafka
   2.0.0





   org.apache.kafka
   kafka-clients
   0.10.0.1


If you have any idea about the problem (data latency), I am looking forward to 
hearing from you.

Thanks

Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


Re: BigQueryIO streaming inserts - poor performance with multiple tables

2018-02-28 Thread Chamikara Jayalath
Could be a DataflowRunner specific issue. Would you mind reporting this
with corresponding Dataflow job IDs to either Dataflow stackoverflow
channel [1] or dataflow-feedb...@google.com ?

I suspect Dataflow split writing to multiple tables into multiple workers
which may be keep all workers busy but we have to look at the job to
confirm.

Thanks,
Cham

[1] https://stackoverflow.com/questions/tagged/google-cloud-dataflow

On Tue, Feb 27, 2018 at 11:56 PM Josh <jof...@gmail.com> wrote:

> Hi all,
>
> We are using BigQueryIO.write() to stream data into BigQuery, and are
> seeing very poor performance in terms of number of writes per second per
> worker.
>
> We are currently using *32* x *n1-standard-4* workers to stream ~15,000
> writes/sec to BigQuery. Each worker has ~90% CPU utilisation. Strangely the
> number of workers and worker CPU utilisation remains constant at ~90% even
> when the rate of input fluctuates down to below 10,000 writes/sec. The job
> always keeps up with the stream (no backlog).
>
> I've seen BigQueryIO benchmarks which show ~20k writes/sec being achieved
> with a single node, when streaming data into a *single* BQ table... So my
> theory is that writing to multiple tables is somehow causing the
> performance issue. Our writes are spread (unevenly) across 200+ tables. The
> job itself does very little processing, and looking at the Dataflow metrics
> pretty much all of the wall time is spent in the *StreamingWrite* step of
> BigQueryIO. The Beam version is 2.2.0.
>
> Our code looks like this:
>
> stream.apply(BigQueryIO.write()
> .to(new ToDestination())
> .withFormatFunction(new FormatForBigQuery())
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>
> where ToDestination is a:
>
> SerializableFunction<ValueInSingleWindow, TableDestination>
>
> which returns a:
>
> new TableDestination(tableName, "")
>
> where tableName looks like "myproject:dataset.tablename$20180228"
>
> Has as anyone else seen this kind of poor performance when streaming writes 
> to multiple BQ tables? Is there anything here that sounds wrong, or any 
> optimisations we can make?
>
> Thanks for any advice!
>
> Josh
>


Re: Running code before pipeline starts

2018-02-28 Thread Lukasz Cwik
You should use a side input and not an empty PCollection that you flatten.

Since
ReadA --> Flatten --> ParDo
ReadB -/
can be equivalently executed as:
ReadA --> ParDo
ReadB --> ParDo

Make sure you access the side input in case a runner evaluates the side
input lazily.

So your pipeline would look like:
Create --> ParDo(DoAction) --> View.asSingleton() named X
... --> ParDo(ProcessElements).withSideInput(X) --> ...

An alternative would be to use CoGroupByKey to join the two streams since
it is not possible to split the execution like I showed with Flatten. It is
wasteful to add the CoGroupByKey but it is a lot less wasteful if you
convert a preceding GroupByKey in your pipeline into a CoGroupByKey joining
the two streams.

On Wed, Feb 28, 2018 at 8:58 AM, Andrew Jones 
wrote:

> Hi,
>
> What is the best way to run code before the pipeline starts? Anything in
> the `main` function doesn't get called when the pipeline is ran on Dataflow
> via a template - only the pipeline. If you're familiar with Spark, then I'm
> thinking of code that might be ran in the driver.
>
> Alternatively, is there a way I can run part of a pipeline first, then run
> another part once it's completed? Not sure that makes sense, so to
> illustrate with a poor attempt at an ascii diagram, if I have something
> like this:
>
>events
>  /\
>  /\
>  |group by key
>  | |
>  |do some action
>  |/
>  |/
>  once action is complete,
> process all original elements
>
> I can presumably achieve this by having `do some action` either generating
> an empty side input or an empty PCollection which I can then use to create
> a PCollectionList along with the original and pass to
> Flatten.pCollections() before continuing. Not sure if that's the best way
> to do it though.
>
> Thanks,
> Andrew
>


Running code before pipeline starts

2018-02-28 Thread Andrew Jones
Hi,

What is the best way to run code before the pipeline starts? Anything in the 
`main` function doesn't get called when the pipeline is ran on Dataflow via a 
template - only the pipeline. If you're familiar with Spark, then I'm thinking 
of code that might be ran in the driver.

Alternatively, is there a way I can run part of a pipeline first, then run 
another part once it's completed? Not sure that makes sense, so to illustrate 
with a poor attempt at an ascii diagram, if I have something like this:

   events
 /\
 /\
 |group by key
 | |
 |do some action
 |/
 |/
 once action is complete,
process all original elements

I can presumably achieve this by having `do some action` either generating an 
empty side input or an empty PCollection which I can then use to create a 
PCollectionList along with the original and pass to Flatten.pCollections() 
before continuing. Not sure if that's the best way to do it though.

Thanks,
Andrew