I've actually found that this was just a matter of pipeline processing speed. I 
removed many layers of transforms such that entities flowed through the 
pipeline faster, and saw the batch sizes increase. I think I may make a 
separate pipeline to take full advantage of batch indexing.

Thanks!

On 2018/12/07 14:36:44, Evan Galpin <[email protected]> wrote: 
> I forgot to reiterate that the PCollection on which EsIO operates is of type 
> String, where each element is a valid JSON document serialized _without_ 
> pretty printing (i.e. without line breaks). If the PCollection should be of a 
> different type, please let me know. From the EsIO source code, I believe it 
> is correct to have a PCollection of String
> 
> On 2018/12/07 14:33:17, Evan Galpin <[email protected]> wrote: 
> > Thanks for confirming that this is unexpected behaviour Tim; certainly the 
> > EsIO code looks to handle bundling. For the record, I've also confirmed via 
> > debugger that `flushBatch()` is not being triggered by large document size.
> > 
> > I'm sourcing records from Google's BigQuery.  I have 2 queries which each 
> > create a PCollection. I use a JacksonFactory to convert BigQuery results to 
> > a valid JSON string (confirmed valid via debugger + linter). I have a few 
> > Transforms to group the records from the 2 queries together, and then 
> > convert again to JSON string via Jackson. I do know that the system creates 
> > valid requests to the bulk API, it's just that it's only 1 document per 
> > request.
> > 
> > Thanks for starting the process with this. If there are other specific 
> > details that I can provide to be helpful, please let me know. Here are the 
> > versions of modules I'm using now:
> > 
> > Beam SDK (beam-sdks-java-core): 2.8.0
> > EsIO (beam-sdks-java-io-elasticsearch): 2.8.0
> > BigQuery IO (beam-sdks-java-io-google-cloud-platform): 2.8.0
> > DirectRunner (beam-runners-direct-java): 2.8.0
> > DataflowRunner (beam-runners-google-cloud-dataflow-java): 2.8.0
> > 
> > 
> > On 2018/12/07 06:36:58, Tim <[email protected]> wrote: 
> > > Hi Evan
> > > 
> > > That is definitely not the expected behaviour and I believe is covered in 
> > > tests which use DirectRunner. Are you able to share your pipeline code, 
> > > or describe how you source your records please? It could be that 
> > > something else is causing EsIO to see bundles sized at only one record.
> > > 
> > > I’ll verify ES IO behaviour when I get to a computer too.
> > > 
> > > Tim (on phone)
> > > 
> > > > On 6 Dec 2018, at 22:00, [email protected] <[email protected]> wrote:
> > > > 
> > > > Hi all,
> > > > 
> > > > I’m having a bit of trouble with ElasticsearchIO Write transform. I’m 
> > > > able to successfully index documents into my elasticsearch cluster, but 
> > > > batching does not seem to work. There ends up being a 1:1 ratio between 
> > > > HTTP requests sent to `/my-index/_doc/_bulk` and the number of 
> > > > documents in my PCollection to which I apply the ElasticsearchIO 
> > > > PTransform. I’ve noticed this specifically under the DirectRunner by 
> > > > utilizing a debugger.
> > > > 
> > > > Am I missing something? Is this possibly a difference between execution 
> > > > environments (Ex. DirectRunner Vs. DataflowRunner)? How can I make sure 
> > > > my program is taking advantage of batching/bulk indexing?
> > > > 
> > > > Thanks,
> > > > Evan
> > > 
> > 
> 

Reply via email to