Re: Kafka IO: value of expansion_service
https://github.com/apache/beam/pull/11557 On Tue, Apr 28, 2020 at 9:28 AM Robert Bradshaw wrote: > Java dependencies are not yet fully propagated over the expansion service, > which might be what you're running into. I'm actually in the process of > putting together a PR to fix this; I'll let you know when it's ready. > > On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver wrote: > >> I'm not sure about the org.springframework.expression.EvaluationContext >> issue, but "local class incompatible" usually happens when using Beam >> components built from different sources. Make sure to rebuild everything >> from the same commit. >> >> On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk >> wrote: >> >>> After syncing to: >>> >>> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master, >>> origin/master, origin/HEAD) >>> Author: Ning Kang >>> Date: Fri Apr 24 10:58:07 2020 -0700 >>> >>> The new error is: >>> >>> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize >>> Custom DoFn With Execution Info >>> at >>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) >>> at >>> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697) >>> at >>> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.(FnApiDoFnRunner.java:356) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266) >>> at >>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) >>> at >>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.io.InvalidClassException: >>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class >>> incompatible: stream classdesc serialVersionUID = 7311199418509482705, >>> local class serialVersionUID = 5488866827627794770 >>> at >>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) >>> at >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) >>> at >>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) >>> at >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) >>> at >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) >>> at >>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71) >>> ... 18 more >>> >>> I am not sure it is related to >>> https://issues.apache.org/jira/browse/BEAM-9745. >>> >>> On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk >>> wrote: >>> Here is an error I am getting when using DirectRunner: DEBUG:apache_
Re: Kafka IO: value of expansion_service
Java dependencies are not yet fully propagated over the expansion service, which might be what you're running into. I'm actually in the process of putting together a PR to fix this; I'll let you know when it's ready. On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver wrote: > I'm not sure about the org.springframework.expression.EvaluationContext > issue, but "local class incompatible" usually happens when using Beam > components built from different sources. Make sure to rebuild everything > from the same commit. > > On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk > wrote: > >> After syncing to: >> >> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master, >> origin/master, origin/HEAD) >> Author: Ning Kang >> Date: Fri Apr 24 10:58:07 2020 -0700 >> >> The new error is: >> >> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize >> Custom DoFn With Execution Info >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) >> at >> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697) >> at >> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.(FnApiDoFnRunner.java:356) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.InvalidClassException: >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class >> incompatible: stream classdesc serialVersionUID = 7311199418509482705, >> local class serialVersionUID = 5488866827627794770 >> at >> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) >> at >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) >> at >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) >> at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) >> at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71) >> ... 18 more >> >> I am not sure it is related to >> https://issues.apache.org/jira/browse/BEAM-9745. >> >> On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk >> wrote: >> >>> Here is an error I am getting when using DirectRunner: >>> >>> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for >>> the bundle bundle_1 to finish. >>> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951 >>> DEBUG:apache_beam.runners.portability.f
Recommended Approach for Parallel / Multi-Stage Enrichment
Hi all, I'm trying to implement a process and I'm not quite sure what the best approach to efficiently implement it might be while taking advantage of Beam's parallelism and recommended patterns. Basically the problem itself can be summarized as follows: I have a series of incoming events which are read from Kafka into my Beam pipeline. These events are Avro-formatted messages which contains nearly a hundred different fields with other nested records and values about the event (e.g. users, ip addresses, etc.). The goal of the pipeline is two fold: - Extract any instances of various entities (e.g. users, ip addresses, etc.) from the original object, key them (using a deterministic UUID seeded by all of the known "key" values), and send them off to their own dedicated Kafka topic (e.g. all extracted users -> users_topic). - Enrich the original event using the identifiers for all of the extracted entities (e.g. if the event came in with an array containing two users, the expectation is that the generated keys would be present on each of those user instances after leaving the pipeline) My current approach has been to simply build a single transform to avoid mutating / enriching the event throughout the pipeline for each series of entities as such: // Extract all entities (into a PCollectionTuple) val taggedEvents = Pipeline .create(options) .apply("Read Events from Kafka", KafkaIO.read(options.incomingEventsTopic, options)) .apply("Identify All Entity Instances", Entities.identify()) // Users taggedEvents .get(Entities.Tags.users) .apply("Flatten Multiple Identified Users", Flatten.iterables()) .apply("Write Users to Kafka", KafkaIO.write(options.usersTopic, options)) // IP Addresses taggedEvents .get(Entities.Tags.ipAddresses) .apply("Flatten Multiple Identified IP Addresses", Flatten.iterables()) .apply("Write IP Addresses to Kafka", KafkaIO.write(options.ipAddressesTopic, options)) // Events (enriched) taggedEvents .get(Entities.Tags.events) .apply("Write Enriched Events to Kafka", KafkaIO.write(options.identifiedEventsTopic, options)) As mentioned, each of these individual extractions for various entities need to add the appropriate identifiers onto the original event such that when the last call above is made (sending events to its destination topic). Currently my Entities.identify() transform basically does the following behind the scenes: class Identify() : PTransform>, PCollectionTuple>() { override fun expand(input: PCollection>): PCollectionTuple { // Take an event in return input .apply("Extract all available entities", ParDo .of(ExtractAllEntities()) .withOutputTags(Tags.events, TupleTagList.of(listOf( Entities.Tags.users, Entities.Tags.computerEndpoints ))) ) } } class ExtractAllEntities() : TraceableDoFn, KV>() { @ProcessElement fun processElement(context: ProcessContext) { // Get the event (mutable) val event = context.element().value.toMutable() // Process the users context.output(Entities.Tags.users, Users.extract(event, tracer)) // Process the computer endpoints context.output(Entities.Tags.computerEndpoints, ComputerEndpoints.extract(event, tracer)) // Tag output context.output(Entities.Tags.events, KV.of(context.element().key, event)) } } } Where all of these extract() function calls are simply private methods, which seems wrong. By creating a single instance of a mutable event, it allows me to perform all of the mutations and extractions in a single pass (as opposed to doing one, pulling out the event, passing it into another transform, repeated ad nauseum). So I have a few questions: - Is there a preferred pattern for handling something like this? - Should those private methods (e.g. ComputerEndpoints.extract(), Users.extract(), etc.) simply be private functions that take in an event and return an array of the requested entities? Or would this better be served as DoFn or some other types of transformations? - In my mind, this seems like it could also be written in such a way so that all of the entity-specific transformations could yield multiple PCollectionTuples (each with the enriched event and a collection of entity instances) which could be merged at some point. Would there be value to doing that? Sorry for the long winded question. I'm still quite new to the Beam ecosystem and I'm just trying to follow the best practices to take advantage of things
Re: HCatalogIO - Trying to read table metadata (columns names and indexes)
Hi Noam, Currently, Beam doesn't support conversion of HCatRecords to Rows (or) in your case creating Beam Schema from Hive table schema, when the Hive table have parameterized types. We can use HCatFieldSchema[1] to create the Beam Schema from the Hive table Schema. I have created a JIRA ticket to track this issue: https://issues.apache.org/jira/browse/BEAM-9840 [1]: https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatFieldSchema.java#L34 PS: I am working on supporting this feature. This feature should be supported in the future releases of Apache Beam. Regards, Rahul On Mon, Apr 27, 2020 at 6:57 PM Gershi, Noam wrote: > Hi > > Using HCatalogIO as a source - I am trying to read column tables. > > > > Code: > > > > PCollection hcatRecords = input > > .apply(HCatalogIO.read() > > .withConfigProperties(configProperties) > > .withDatabase(“db-name”) > > .withTable(“my-table-name”)); > > ... > > HCatalogBeamSchema hcatSchema = > HCatalogBeamSchema.create(ImmutableMap.of("table", "my-table-name")); > > Schema schema = hcatSchema.getTableSchema("db-name", > "my-table-name”).get(); > > List fields = schema.getFields(); > > > > > > I get: > > > > 20/04/27 09:12:16 INFO LineBufferedStream: Caused by: > java.lang.UnsupportedOperationException: The type 'decimal(30,16)' of field > 'amount' is not supported. > > 20/04/27 09:12:16 INFO LineBufferedStream: at > org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamField(SchemaUtils.java:60) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema(SchemaUtils.java:53) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > org.apache.beam.sdk.io.hcatalog.HCatalogBeamSchema.getTableSchema(HCatalogBeamSchema.java:83) > > > > Thanx in advance, > > Noam > > >
Re: Running Nexmark for Flink Streaming
Max would it make sense to make the rocksdb runtime only at the runner level just to hint its use. I assume that most Flink users might want to have RocksDB as the default state backend? runtimeOnly "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version" On Tue, Apr 28, 2020 at 1:14 PM Sruthi Sree Kumar wrote: > > Hello, > > I have looked the Beam code. The statebackend that we pass should be an > instance of FlinkStateBackendFactory . But there is no implementation for > the Interface. > None of the FlinkStateBackend Implements this interface. > > So even when I try to pass the default MemoryStateBackend as an argument, it > throws an error. > > But I could manage to specify the statebackend (filesystem) using the config > file in the config directory specified by the env variable ENV_FLINK_CONF_DIR. > > Regards, > Sruthi > > On Tue, Apr 28, 2020 at 11:35 AM Maximilian Michels wrote: >> >> Hi Sruthi, >> >> Not possible out-of-the-box at the moment. You'll have to add the >> RocksDB Flink dependency in flink_runner.gradle, e.g.: >> >> compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version" >> >> Also in the Flink config you have to set >> >> state.backend: rocksdb >> >> Then you can run Nexmark against the cluster, e.g. >> >> ./gradlew :sdks:java:testing:nexmark:run \ >> -Pnexmark.runner=":runners:flink:1.10" \ >> -Pnexmark.args=" >> --runner=FlinkRunner >> --flinkMaster= >> --streaming=true >> --shutdownSourcesOnFinalWatermark=true >> --query=12 >> --suite=SMOKE >> --manageResources=false >> --monitorJobs=true >> --enforceEncodability=true >> --enforceImmutability=true" >> >> >> Admittedly, this is a bit complicated. We could make that easier without >> much work. >> >> Cheers, >> Max >> >> On 28.04.20 10:26, Sruthi Sree Kumar wrote: >> > Hello, >> > >> > Is it possible to run the nexmark queries by specifying a >> > state-backed(Ex: RocksDB) ? >> > >> > >> > Regards, >> > Sruthi
Re: Running Nexmark for Flink Streaming
Hello, I have looked the Beam code. The statebackend that we pass should be an instance of FlinkStateBackendFactory . But there is no implementation for the Interface. None of the FlinkStateBackend Implements this interface. So even when I try to pass the default MemoryStateBackend as an argument, it throws an error. But I could manage to specify the statebackend (filesystem) using the config file in the config directory specified by the env variable ENV_FLINK_CONF_DIR. Regards, Sruthi On Tue, Apr 28, 2020 at 11:35 AM Maximilian Michels wrote: > Hi Sruthi, > > Not possible out-of-the-box at the moment. You'll have to add the > RocksDB Flink dependency in flink_runner.gradle, e.g.: > > compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version" > > Also in the Flink config you have to set > > state.backend: rocksdb > > Then you can run Nexmark against the cluster, e.g. > > ./gradlew :sdks:java:testing:nexmark:run \ > -Pnexmark.runner=":runners:flink:1.10" \ > -Pnexmark.args=" > --runner=FlinkRunner > --flinkMaster= > --streaming=true > --shutdownSourcesOnFinalWatermark=true > --query=12 > --suite=SMOKE > --manageResources=false > --monitorJobs=true > --enforceEncodability=true > --enforceImmutability=true" > > > Admittedly, this is a bit complicated. We could make that easier without > much work. > > Cheers, > Max > > On 28.04.20 10:26, Sruthi Sree Kumar wrote: > > Hello, > > > > Is it possible to run the nexmark queries by specifying a > > state-backed(Ex: RocksDB) ? > > > > > > Regards, > > Sruthi >
Re: Running Nexmark for Flink Streaming
Hi Sruthi, Not possible out-of-the-box at the moment. You'll have to add the RocksDB Flink dependency in flink_runner.gradle, e.g.: compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version" Also in the Flink config you have to set state.backend: rocksdb Then you can run Nexmark against the cluster, e.g. ./gradlew :sdks:java:testing:nexmark:run \ -Pnexmark.runner=":runners:flink:1.10" \ -Pnexmark.args=" --runner=FlinkRunner --flinkMaster= --streaming=true --shutdownSourcesOnFinalWatermark=true --query=12 --suite=SMOKE --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true" Admittedly, this is a bit complicated. We could make that easier without much work. Cheers, Max On 28.04.20 10:26, Sruthi Sree Kumar wrote: > Hello, > > Is it possible to run the nexmark queries by specifying a > state-backed(Ex: RocksDB) ? > > > Regards, > Sruthi
Running Nexmark for Flink Streaming
Hello, Is it possible to run the nexmark queries by specifying a state-backed(Ex: RocksDB) ? Regards, Sruthi