Re: Kafka IO: value of expansion_service

2020-04-28 Thread Robert Bradshaw
https://github.com/apache/beam/pull/11557

On Tue, Apr 28, 2020 at 9:28 AM Robert Bradshaw  wrote:

> Java dependencies are not yet fully propagated over the expansion service,
> which might be what you're running into. I'm actually in the process of
> putting together a PR to fix this; I'll let you know when it's ready.
>
> On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver  wrote:
>
>> I'm not sure about the org.springframework.expression.EvaluationContext
>> issue, but "local class incompatible" usually happens when using Beam
>> components built from different sources. Make sure to rebuild everything
>> from the same commit.
>>
>> On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk 
>> wrote:
>>
>>> After syncing to:
>>>
>>> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master,
>>> origin/master, origin/HEAD)
>>> Author: Ning Kang 
>>> Date:   Fri Apr 24 10:58:07 2020 -0700
>>>
>>> The new error is:
>>>
>>> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize
>>> Custom DoFn With Execution Info
>>> at
>>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>>> at
>>> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697)
>>> at
>>> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.(FnApiDoFnRunner.java:356)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266)
>>> at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>>> at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.InvalidClassException:
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class
>>> incompatible: stream classdesc serialVersionUID = 7311199418509482705,
>>> local class serialVersionUID = 5488866827627794770
>>> at
>>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>>> at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>>> at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>> at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>>> at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>>> at
>>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
>>> ... 18 more
>>>
>>> I am not sure it is related to
>>> https://issues.apache.org/jira/browse/BEAM-9745.
>>>
>>> On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk 
>>> wrote:
>>>
 Here is an error I am getting when using DirectRunner:

 DEBUG:apache_

Re: Kafka IO: value of expansion_service

2020-04-28 Thread Robert Bradshaw
Java dependencies are not yet fully propagated over the expansion service,
which might be what you're running into. I'm actually in the process of
putting together a PR to fix this; I'll let you know when it's ready.

On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver  wrote:

> I'm not sure about the org.springframework.expression.EvaluationContext
> issue, but "local class incompatible" usually happens when using Beam
> components built from different sources. Make sure to rebuild everything
> from the same commit.
>
> On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk 
> wrote:
>
>> After syncing to:
>>
>> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master,
>> origin/master, origin/HEAD)
>> Author: Ning Kang 
>> Date:   Fri Apr 24 10:58:07 2020 -0700
>>
>> The new error is:
>>
>> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize
>> Custom DoFn With Execution Info
>> at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>> at
>> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697)
>> at
>> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.(FnApiDoFnRunner.java:356)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266)
>> at
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>> at
>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.InvalidClassException:
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class
>> incompatible: stream classdesc serialVersionUID = 7311199418509482705,
>> local class serialVersionUID = 5488866827627794770
>> at
>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>> at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>> at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>> at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
>> ... 18 more
>>
>> I am not sure it is related to
>> https://issues.apache.org/jira/browse/BEAM-9745.
>>
>> On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk 
>> wrote:
>>
>>> Here is an error I am getting when using DirectRunner:
>>>
>>> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for
>>> the bundle bundle_1 to finish.
>>> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951
>>> DEBUG:apache_beam.runners.portability.f

Recommended Approach for Parallel / Multi-Stage Enrichment

2020-04-28 Thread Rion Williams
Hi all,

I'm trying to implement a process and I'm not quite sure what the best approach 
to efficiently implement it might be while taking advantage of Beam's 
parallelism and recommended patterns. Basically the problem itself can be 
summarized as follows:

I have a series of incoming events which are read from Kafka into my Beam 
pipeline. These events are Avro-formatted messages which contains nearly a 
hundred different fields with other nested records and values about the event 
(e.g. users, ip addresses, etc.). The goal of the pipeline is two fold:

- Extract any instances of various entities (e.g. users, ip addresses, etc.) 
from the original object, key them (using a deterministic UUID seeded by all of 
the known "key" values), and send them off to their own dedicated Kafka topic 
(e.g. all extracted users -> users_topic).
- Enrich the original event using the identifiers for all of the extracted 
entities (e.g. if the event came in with an array containing two users, the 
expectation is that the generated keys would be present on each of those user 
instances after leaving the pipeline)

My current approach has been to simply build a single transform to avoid 
mutating / enriching the event throughout the pipeline for each series of 
entities as such:

// Extract all entities (into a PCollectionTuple)
val taggedEvents = Pipeline
.create(options)
.apply("Read Events from Kafka", 
KafkaIO.read(options.incomingEventsTopic, options))
.apply("Identify All Entity Instances", Entities.identify())

// Users
taggedEvents
.get(Entities.Tags.users)
.apply("Flatten Multiple Identified Users", Flatten.iterables())
.apply("Write Users to Kafka", 
KafkaIO.write(options.usersTopic, options))

// IP Addresses
taggedEvents
.get(Entities.Tags.ipAddresses)
.apply("Flatten Multiple Identified IP Addresses", 
Flatten.iterables())
.apply("Write IP Addresses to Kafka", 
KafkaIO.write(options.ipAddressesTopic, options))

// Events (enriched)
taggedEvents
.get(Entities.Tags.events)
.apply("Write Enriched Events to Kafka", 
KafkaIO.write(options.identifiedEventsTopic, options))

As mentioned, each of these individual extractions for various entities need to 
add the appropriate identifiers onto the original event such that when the last 
call above is made (sending events to its destination topic).

Currently my Entities.identify() transform basically does the following behind 
the scenes:

class Identify() : PTransform>, 
PCollectionTuple>() {
override fun expand(input: PCollection>): 
PCollectionTuple {
// Take an event in
return input
.apply("Extract all available entities",
ParDo
.of(ExtractAllEntities())
.withOutputTags(Tags.events, TupleTagList.of(listOf(
Entities.Tags.users,
Entities.Tags.computerEndpoints
)))
)
}
}

class ExtractAllEntities() : TraceableDoFn, KV>() {

@ProcessElement
fun processElement(context: ProcessContext) {
// Get the event (mutable)
val event = context.element().value.toMutable()

// Process the users
context.output(Entities.Tags.users, Users.extract(event, tracer))

// Process the computer endpoints
context.output(Entities.Tags.computerEndpoints, 
ComputerEndpoints.extract(event, tracer))

// Tag output
context.output(Entities.Tags.events, KV.of(context.element().key, 
event))
}
}
}

Where all of these extract() function calls are simply private methods, which 
seems wrong. By creating a single instance of a mutable event, it allows me to 
perform all of the mutations and extractions in a single pass (as opposed to 
doing one, pulling out the event, passing it into another transform, repeated 
ad nauseum).

So I have a few questions:

- Is there a preferred pattern for handling something like this?
- Should those private methods (e.g. ComputerEndpoints.extract(), 
Users.extract(), etc.) simply be private functions that take in an event and 
return an array of the requested entities? Or would this better be served as 
DoFn or some other types of transformations?
- In my mind, this seems like it could also be written in such a way so that 
all of the entity-specific transformations could yield multiple 
PCollectionTuples (each with the enriched event and a collection of entity 
instances) which could be merged at some point. Would there be value to doing 
that?

Sorry for the long winded question. I'm still quite new to the Beam ecosystem 
and I'm just trying to follow the best practices to take advantage of things 

Re: HCatalogIO - Trying to read table metadata (columns names and indexes)

2020-04-28 Thread rahul patwari
Hi Noam,

Currently, Beam doesn't support conversion of HCatRecords to Rows (or) in
your case creating Beam Schema from Hive table schema, when the Hive table
have parameterized types.

We can use HCatFieldSchema[1] to create the Beam Schema from the Hive table
Schema.
I have created a JIRA ticket to track this issue:
https://issues.apache.org/jira/browse/BEAM-9840

[1]:
https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatFieldSchema.java#L34

PS: I am working on supporting this feature. This feature should be
supported in the future releases of Apache Beam.

Regards,
Rahul

On Mon, Apr 27, 2020 at 6:57 PM Gershi, Noam  wrote:

> Hi
>
> Using HCatalogIO as a source - I am trying to read column tables.
>
>
>
> Code:
>
>
>
> PCollection hcatRecords = input
>
> .apply(HCatalogIO.read()
>
> .withConfigProperties(configProperties)
>
> .withDatabase(“db-name”)
>
> .withTable(“my-table-name”));
>
> ...
>
> HCatalogBeamSchema hcatSchema =
> HCatalogBeamSchema.create(ImmutableMap.of("table", "my-table-name"));
>
> Schema schema = hcatSchema.getTableSchema("db-name",
> "my-table-name”).get();
>
> List fields = schema.getFields();
>
>
>
>
>
> I get:
>
>
>
> 20/04/27 09:12:16 INFO LineBufferedStream: Caused by:
> java.lang.UnsupportedOperationException: The type 'decimal(30,16)' of field
> 'amount' is not supported.
>
> 20/04/27 09:12:16 INFO LineBufferedStream:  at
> org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamField(SchemaUtils.java:60)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:  at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:  at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:  at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:  at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:  at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:  at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:  at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:  at
> org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema(SchemaUtils.java:53)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:  at
> org.apache.beam.sdk.io.hcatalog.HCatalogBeamSchema.getTableSchema(HCatalogBeamSchema.java:83)
>
>
>
> Thanx in advance,
>
> Noam
>
>
>


Re: Running Nexmark for Flink Streaming

2020-04-28 Thread Ismaël Mejía
Max would it make sense to make the rocksdb runtime only at the runner
level just to hint its use. I assume that most Flink users might want
to have RocksDB as the default state backend?

runtimeOnly
"org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"


On Tue, Apr 28, 2020 at 1:14 PM Sruthi Sree Kumar
 wrote:
>
> Hello,
>
> I have looked the Beam code. The statebackend that we pass should be an 
> instance of  FlinkStateBackendFactory . But there is no implementation for 
> the Interface.
> None of the FlinkStateBackend Implements this interface.
>
> So even when I try to pass the default MemoryStateBackend as an argument, it 
> throws an error.
>
> But I could manage to specify the statebackend (filesystem) using the config 
> file in the config directory specified by the env variable ENV_FLINK_CONF_DIR.
>
> Regards,
> Sruthi
>
> On Tue, Apr 28, 2020 at 11:35 AM Maximilian Michels  wrote:
>>
>> Hi Sruthi,
>>
>> Not possible out-of-the-box at the moment. You'll have to add the
>> RocksDB Flink dependency in flink_runner.gradle, e.g.:
>>
>>   compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"
>>
>> Also in the Flink config you have to set
>>
>>   state.backend: rocksdb
>>
>> Then you can run Nexmark against the cluster, e.g.
>>
>> ./gradlew :sdks:java:testing:nexmark:run \
>> -Pnexmark.runner=":runners:flink:1.10" \
>> -Pnexmark.args="
>> --runner=FlinkRunner
>> --flinkMaster=
>> --streaming=true
>> --shutdownSourcesOnFinalWatermark=true
>> --query=12
>> --suite=SMOKE
>> --manageResources=false
>> --monitorJobs=true
>> --enforceEncodability=true
>> --enforceImmutability=true"
>>
>>
>> Admittedly, this is a bit complicated. We could make that easier without
>> much work.
>>
>> Cheers,
>> Max
>>
>> On 28.04.20 10:26, Sruthi Sree Kumar wrote:
>> > Hello,
>> >
>> > Is it possible to run the nexmark queries by specifying a
>> > state-backed(Ex: RocksDB) ?
>> >
>> >
>> > Regards,
>> > Sruthi


Re: Running Nexmark for Flink Streaming

2020-04-28 Thread Sruthi Sree Kumar
Hello,

I have looked the Beam code. The statebackend that we pass should be an
instance of  FlinkStateBackendFactory . But there is no implementation for
the Interface.
None of the FlinkStateBackend Implements this interface.

So even when I try to pass the default MemoryStateBackend as an argument,
it throws an error.

But I could manage to specify the statebackend (filesystem) using the
config file in the config directory specified by the env variable
ENV_FLINK_CONF_DIR.

Regards,
Sruthi

On Tue, Apr 28, 2020 at 11:35 AM Maximilian Michels  wrote:

> Hi Sruthi,
>
> Not possible out-of-the-box at the moment. You'll have to add the
> RocksDB Flink dependency in flink_runner.gradle, e.g.:
>
>   compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"
>
> Also in the Flink config you have to set
>
>   state.backend: rocksdb
>
> Then you can run Nexmark against the cluster, e.g.
>
> ./gradlew :sdks:java:testing:nexmark:run \
> -Pnexmark.runner=":runners:flink:1.10" \
> -Pnexmark.args="
> --runner=FlinkRunner
> --flinkMaster=
> --streaming=true
> --shutdownSourcesOnFinalWatermark=true
> --query=12
> --suite=SMOKE
> --manageResources=false
> --monitorJobs=true
> --enforceEncodability=true
> --enforceImmutability=true"
>
>
> Admittedly, this is a bit complicated. We could make that easier without
> much work.
>
> Cheers,
> Max
>
> On 28.04.20 10:26, Sruthi Sree Kumar wrote:
> > Hello,
> >
> > Is it possible to run the nexmark queries by specifying a
> > state-backed(Ex: RocksDB) ?
> >
> >
> > Regards,
> > Sruthi
>


Re: Running Nexmark for Flink Streaming

2020-04-28 Thread Maximilian Michels
Hi Sruthi,

Not possible out-of-the-box at the moment. You'll have to add the
RocksDB Flink dependency in flink_runner.gradle, e.g.:

  compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"

Also in the Flink config you have to set

  state.backend: rocksdb

Then you can run Nexmark against the cluster, e.g.

./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:flink:1.10" \
-Pnexmark.args="
--runner=FlinkRunner
--flinkMaster=
--streaming=true
--shutdownSourcesOnFinalWatermark=true
--query=12
--suite=SMOKE
--manageResources=false
--monitorJobs=true
--enforceEncodability=true
--enforceImmutability=true"


Admittedly, this is a bit complicated. We could make that easier without
much work.

Cheers,
Max

On 28.04.20 10:26, Sruthi Sree Kumar wrote:
> Hello,
> 
> Is it possible to run the nexmark queries by specifying a
> state-backed(Ex: RocksDB) ?
> 
> 
> Regards,
> Sruthi


Running Nexmark for Flink Streaming

2020-04-28 Thread Sruthi Sree Kumar
Hello,

Is it possible to run the nexmark queries by specifying a state-backed(Ex:
RocksDB) ?


Regards,
Sruthi