Jacob Gur created BEAM-7608:
-------------------------------

             Summary: v1new ReadFromDatastore skips entities
                 Key: BEAM-7608
                 URL: https://issues.apache.org/jira/browse/BEAM-7608
             Project: Beam
          Issue Type: Bug
          Components: io-python-gcp
    Affects Versions: 2.13.0
         Environment: MacOS 10.14.5, Python 2.7
            Reporter: Jacob Gur


A simple map over a datastore kind in local emulator using the new 
v1new.datastoreio.ReadFromDatastore skip entities.

The kind has 1516 entities, and when I map over it using the old 
ReadFromDatastore transform, it maps all of them, i.e., I can map to id and 
write to text file.

But the new transform only maps 365 entities. There is no error. The tail of 
the standard output is:

```

INFO:root:Latest stats timestamp for kind face_apilog is 2019-06-18 
08:15:21+00:00
INFO:root:Estimated size bytes for query: 116188
INFO:root:Splitting the query into 12 splits
INFO:root:Running 
(((GetEntities/Reshuffle/ReshufflePerKey/GroupByKey/Read)+(ref_AppliedPTransform_GetEntities/Reshuffle/ReshufflePerKey/FlatMap(restore_timestamps)_14))+((ref_AppliedPTransform_GetEntities/Reshuffle/RemoveRandomKeys_15)+(ref_AppliedPTransform_GetEntities/Read_16)))+((ref_AppliedPTransform_MapToId_17)+((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WriteBundles_24)+((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Pair_25)+((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WindowInto(WindowIntoFn)_26)+(WriteToFile/Write/WriteImpl/GroupByKey/Write)))))
INFO:root:Running 
(WriteToFile/Write/WriteImpl/GroupByKey/Read)+((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Extract_31)+(ref_PCollection_PCollection_20/Write))
INFO:root:Running 
(ref_PCollection_PCollection_12/Read)+((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/PreFinalize_32)+(ref_PCollection_PCollection_21/Write))
INFO:root:Running 
(ref_PCollection_PCollection_12/Read)+(ref_AppliedPTransform_WriteToFile/Write/WriteImpl/FinalizeWrite_33)
INFO:root:Starting finalize_write threads with num_shards: 1 (skipped: 0), 
batches: 1, num_threads: 1
INFO:root:Renamed 1 shards in 0.12 seconds.

```

The code for the job on the new transform is:

```


from __future__ import absolute_import

import logging
import os
import sys

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1new.types import Query

# TODO: should be set outside of python process
os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085'


def map_to_id(element):
 face_log_id = element.to_client_entity().id
 return face_log_id


def run(argv=None):
 p = beam.Pipeline(argv=argv)

 project = 'fareclockdev'

 (p
 | 'GetEntities' >> ReadFromDatastore(Query(kind='face_apilog', 
project=project))
 | 'MapToId' >> beam.Map(map_to_id)
 | 'WriteToFile' >> beam.io.WriteToText('result')
 )

 p.run().wait_until_finish()


if __name__ == '__main__':
 logging.getLogger().setLevel(logging.INFO)
 run(sys.argv)

```

For comparison, the code for the job on the old transform is:

```


from __future__ import absolute_import

import logging
import os
import sys

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from google.cloud.proto.datastore.v1 import query_pb2

# TODO: should be set outside of python process
os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085'


def map_to_id(element):
 face_log_id = element.key.path[-1].id
 return face_log_id


def run(argv=None):
 p = beam.Pipeline(argv=argv)

 project = 'fareclockdev'

 query = query_pb2.Query()
 query.kind.add().name = 'face_apilog'

 (p
 | 'GetEntities' >> ReadFromDatastore(project=project, query=query)
 # TODO: ParDo???
 | 'MapToId' >> beam.Map(map_to_id)
 | 'WriteToFile' >> beam.io.WriteToText('result')
 )

 p.run().wait_until_finish()


if __name__ == '__main__':
 logging.getLogger().setLevel(logging.INFO)
 run(sys.argv)

```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to