I think it's the most elegant approach: the user should be able to decide the id field he wants to use.

Regards
JB

On 11/16/2017 09:24 AM, Etienne Chauchot wrote:
+1, that is what I had in mind, if I recall correctly this is what es_hadoop connector does.


Le 15/11/2017 à 20:22, Tim Robertson a écrit :
Hi Chet,

I'll be a user of this, so thank you.

It seems reasonable although - did you consider letting folk name the document ID field explicitly?  It would avoid an unnecessary transformation and might be simpler:
   // instruct the writer to use a provided document ID
   
ElasticsearchIO.write().withConnectionConfiguration(conn).withMaxBatchSize(BATCH_SIZE).withDocumentIdField("myID");

On Wed, Nov 15, 2017 at 8:08 PM, Chet Aldrich <[email protected] <mailto:[email protected]>> wrote:

    Given that this seems like a change that should probably happen, and I’d
    like to help contribute if possible, a few questions and my current opinion:

    So I’m leaning towards approach B here, which is:

    b. (a bit less user friendly) PCollection<KV> with K as an id. But forces
    the user to do a Pardo before writing to ES to output KV pairs of <id, json>

    I think that the reduction in user-friendliness may be outweighed by the
    fact that this obviates some of the issues surrounding a failure when
    finishing a bundle. Additionally, this /forces/ the user to provide a
    document id, which I think is probably better practice. This will also
    probably lead to fewer frustrations around “magic” code that just pulls
    something in if it happens to be there, and doesn’t if not. We’ll need to
    rely on the user catching this functionality in the docs or the code
    itself to take advantage of it.

    IMHO it’d be generally better to enforce this at compile time because it
    does have an effect on whether the pipeline produces duplicates on
    failure. Additionally, we get the benefit of relatively intuitive behavior
    where if the user passes in the same Key value, it’ll update a record in
    ES, and if the key is different then it will create a new record.

    Curious to hear thoughts on this. If this seems reasonable I’ll go ahead
    and create a JIRA for tracking and start working on a PR for this. Also,
    if it’d be good to loop in the dev mailing list before starting let me
    know, I’m pretty new to this.

    Chet

    On Nov 15, 2017, at 12:53 AM, Etienne Chauchot <[email protected]
    <mailto:[email protected]>> wrote:

    Hi Chet,

    What you say is totally true, docs written using ElasticSearchIO will
    always have an ES generated id. But it might change in the future, indeed
    it might be a good thing to allow the user to pass an id. Just in 5
    seconds thinking, I see 3 possible designs for that.

    a.(simplest) use a json special field for the id, if it is provided by
    the user in the input json then it is used, auto-generated id otherwise.

    b. (a bit less user friendly) PCollection<KV> with K as an id. But forces
    the user to do a Pardo before writing to ES to output KV pairs of <id, json>

    c. (a lot more complex) Allow the IO to serialize/deserialize java beans
    and have an String id field. Matching java types to ES types is quite
    tricky, so, for now we just relied on the user to serialize his beans
    into json and let ES match the types automatically.

    Related to the problems you raise bellow:

    1. Well, the bundle is the commit entity of beam. Consider the case of
    ESIO.batchSize being < to bundle size. While processing records, when the
    number of elements reaches batchSize, an ES bulk insert will be issued
    but no finishBundle. If there is a problem later on in the bundle
    processing before the finishBundle, the checkpoint will still be at the
    beginning of the bundle, so all the bundle will be retried leading to
    duplicate documents. Thanks for raising that! I'm CCing the dev list so
    that someone could correct me on the checkpointing mecanism if I'm
    missing something. Besides I'm thinking about forcing the user to provide
    an id in all cases to workaround this issue.

    2. Correct.

    Best,
    Etienne

    Le 15/11/2017 à 02:16, Chet Aldrich a écrit :
    Hello all!

    So I’ve been using the ElasticSearchIO sink for a project (unfortunately
    it’s Elasticsearch 5.x, and so I’ve been messing around with the latest
    RC) and I’m finding that it doesn’t allow for changing the document ID,
    but only lets you pass in a record, which means that the document ID is
    auto-generated. See this line for what specifically is happening:

    
https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838
    
<https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838>

    Essentially the data part of the document is being placed but it doesn’t
    allow for other properties, such as the document ID, to be set.

    This leads to two problems:

    1. Beam doesn’t necessarily guarantee exactly-once execution for a given
    item in a PCollection, as I understand it. This means that you may get
    more than one record in Elastic for a given item in a PCollection that
    you pass in.

    2. You can’t do partial updates to an index. If you run a batch job
    once, and then run the batch job again on the same index without
    clearing it, you just double everything in there.

    Is there any good way around this?

    I’d be happy to try writing up a PR for this in theory, but not sure how
    to best approach it. Also would like to figure out a way to get around
    this in the meantime, if anyone has any ideas.

    Best,

    Chet

    P.S. CCed [email protected] <mailto:[email protected]> because it
    seems like he’s been doing work related to the elastic sink.







--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to