+1, that is what I had in mind, if I recall correctly this is what es_hadoop connector does.

Le 15/11/2017 à 20:22, Tim Robertson a écrit :
Hi Chet,

I'll be a user of this, so thank you.

It seems reasonable although - did you consider letting folk name the document ID field explicitly?  It would avoid an unnecessary transformation and might be simpler:
   // instruct the writer to use a provided document ID
   
ElasticsearchIO.write().withConnectionConfiguration(conn).withMaxBatchSize(BATCH_SIZE).withDocumentIdField("myID");

On Wed, Nov 15, 2017 at 8:08 PM, Chet Aldrich <[email protected] <mailto:[email protected]>> wrote:

    Given that this seems like a change that should probably happen,
    and I’d like to help contribute if possible, a few questions and
    my current opinion:

    So I’m leaning towards approach B here, which is:

    b. (a bit less user friendly) PCollection<KV> with K as an id.
    But forces the user to do a Pardo before writing to ES to output
    KV pairs of <id, json>

    I think that the reduction in user-friendliness may be outweighed
    by the fact that this obviates some of the issues surrounding a
    failure when finishing a bundle. Additionally, this /forces/ the
    user to provide a document id, which I think is probably better
    practice. This will also probably lead to fewer frustrations
    around “magic” code that just pulls something in if it happens to
    be there, and doesn’t if not. We’ll need to rely on the user
    catching this functionality in the docs or the code itself to take
    advantage of it.

    IMHO it’d be generally better to enforce this at compile time
    because it does have an effect on whether the pipeline produces
    duplicates on failure. Additionally, we get the benefit of
    relatively intuitive behavior where if the user passes in the same
    Key value, it’ll update a record in ES, and if the key is
    different then it will create a new record.

    Curious to hear thoughts on this. If this seems reasonable I’ll go
    ahead and create a JIRA for tracking and start working on a PR for
    this. Also, if it’d be good to loop in the dev mailing list before
    starting let me know, I’m pretty new to this.

    Chet

    On Nov 15, 2017, at 12:53 AM, Etienne Chauchot
    <[email protected] <mailto:[email protected]>> wrote:

    Hi Chet,

    What you say is totally true, docs written using ElasticSearchIO
    will always have an ES generated id. But it might change in the
    future, indeed it might be a good thing to allow the user to pass
    an id. Just in 5 seconds thinking, I see 3 possible designs for
    that.

    a.(simplest) use a json special field for the id, if it is
    provided by the user in the input json then it is used,
    auto-generated id otherwise.

    b. (a bit less user friendly) PCollection<KV> with K as an id.
    But forces the user to do a Pardo before writing to ES to output
    KV pairs of <id, json>

    c. (a lot more complex) Allow the IO to serialize/deserialize
    java beans and have an String id field. Matching java types to ES
    types is quite tricky, so, for now we just relied on the user to
    serialize his beans into json and let ES match the types
    automatically.

    Related to the problems you raise bellow:

    1. Well, the bundle is the commit entity of beam. Consider the
    case of ESIO.batchSize being < to bundle size. While processing
    records, when the number of elements reaches batchSize, an ES
    bulk insert will be issued but no finishBundle. If there is a
    problem later on in the bundle processing before the
    finishBundle, the checkpoint will still be at the beginning of
    the bundle, so all the bundle will be retried leading to
    duplicate documents. Thanks for raising that! I'm CCing the dev
    list so that someone could correct me on the checkpointing
    mecanism if I'm missing something. Besides I'm thinking about
    forcing the user to provide an id in all cases to workaround this
    issue.

    2. Correct.

    Best,
    Etienne

    Le 15/11/2017 à 02:16, Chet Aldrich a écrit :
    Hello all!

    So I’ve been using the ElasticSearchIO sink for a project
    (unfortunately it’s Elasticsearch 5.x, and so I’ve been messing
    around with the latest RC) and I’m finding that it doesn’t allow
    for changing the document ID, but only lets you pass in a
    record, which means that the document ID is auto-generated. See
    this line for what specifically is happening:

    
https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838
    
<https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838>

    Essentially the data part of the document is being placed but it
    doesn’t allow for other properties, such as the document ID, to
    be set.

    This leads to two problems:

    1. Beam doesn’t necessarily guarantee exactly-once execution for
    a given item in a PCollection, as I understand it. This means
    that you may get more than one record in Elastic for a given
    item in a PCollection that you pass in.

    2. You can’t do partial updates to an index. If you run a batch
    job once, and then run the batch job again on the same index
    without clearing it, you just double everything in there.

    Is there any good way around this?

    I’d be happy to try writing up a PR for this in theory, but not
    sure how to best approach it. Also would like to figure out a
    way to get around this in the meantime, if anyone has any ideas.

    Best,

    Chet

    P.S. CCed [email protected]
    <mailto:[email protected]> because it seems like he’s been
    doing work related to the elastic sink.






Reply via email to