Great, this looks fine to me. Can you raise a JIRA with the issue
description and there after raise a PR for this change.
Thanks,
Vicky

On Sat, Apr 28, 2018 at 3:22 AM, Kidwell, Jack <jack_kidwel...@comcast.com>
wrote:

> I wiped my windows directory, but because I was using tags/gobblin_0.10.0
> it’s pointless to troubleshoot it.
>
>
>
> On my fedora workstation, I’m using tags/release-0.12.0-rc2, and patched
>
> gobblin-modules/gobblin-kafka-common/src/main/java/org/
> apache/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java.
>
>
>
>
> The tests pass. How do I get this trivial change in? Do I make branch and
> a pull request?
>
>
>
> Here’s the output:
>
>
>
> ./gradlew :gobblin-modules:gobblin-kafka-common:test
>
> Parallel execution with configuration on demand is an incubating feature.
>
> :buildSrc:compileJava UP-TO-DATE
>
> :buildSrc:compileGroovy UP-TO-DATE
>
> :buildSrc:processResources UP-TO-DATE
>
> :buildSrc:classes UP-TO-DATE
>
> :buildSrc:jar UP-TO-DATE
>
> :buildSrc:assemble UP-TO-DATE
>
> :buildSrc:compileTestJava UP-TO-DATE
>
> :buildSrc:compileTestGroovy UP-TO-DATE
>
> :buildSrc:processTestResources UP-TO-DATE
>
> :buildSrc:testClasses UP-TO-DATE
>
> :buildSrc:test UP-TO-DATE
>
> :buildSrc:check UP-TO-DATE
>
> :buildSrc:build UP-TO-DATE
>
> Build property: gobblinFlavor=standard
>
> Build property: jdkVersion=1.8
>
> Build property: sonatypeArtifactRepository=htt
> ps://oss.sonatype.org/service/local/staging/deploy/maven2/
>
> Build property: sonatypeArtifactSnapshotRepository=
> https://oss.sonatype.org/content/repositories/snapshots/
>
> Build property: nexusArtifactRepository=https://repository.apache.org/
> service/local/staging/deploy/maven2
>
> Build property: nexusArtifactSnapshotRepository=https://repository.apache.
> org/content/repositories/snapshots
>
> Build property: doNotSignArtifacts=false
>
> Build property: avroVersion=1.8.1
>
> Build property: awsVersion=1.11.8
>
> Build property: bytemanVersion=2.2.1
>
> Build property: confluentVersion=2.0.1
>
> Build property: hadoopVersion=2.3.0
>
> Build property: hiveVersion=1.0.1
>
> Build property: kafka08Version=0.8.2.2
>
> Build property: kafka09Version=0.9.0.1
>
> Build property: pegasusVersion=11.0.0
>
> Build property: salesforceVersion=37.0.3
>
> Detected Gradle version major=2 minor=13
>
> Build property: publishToMaven=false
>
> Build property: publishToNexus=false
>
> Release Version: 0.12.0
>
> :gobblin-api:compileJava
>
> :gobblin-utility:processResources
>
> :gobblin-metrics-libs:gobblin-metrics-base:generateAvro UP-TO-DATE
>
> :gobblin-metrics-libs:gobblin-metrics-base:processResources
>
> :gobblin-modules:gobblin-metrics-graphite:processResources UP-TO-DATE
>
> :gobblin-modules:gobblin-metrics-hadoop:processResources UP-TO-DATE
>
> :gobblin-modules:gobblin-metrics-influxdb:processResources UP-TO-DATE
>
> :gobblin-metrics-libs:gobblin-metrics:processResources UP-TO-DATE
>
> :gobblin-modules:gobblin-codecs:processResources UP-TO-DATE
>
> :gobblin-core-base:processResources UP-TO-DATE
>
> :gobblin-test-utils:processResources
>
> :gobblin-config-management:gobblin-config-core:processResources
>
> :gobblin-config-management:gobblin-config-client:processResources
> UP-TO-DATE
>
> :gobblin-modules:gobblin-kafka-common:processResources UP-TO-DATE
>
> :gobblin-modules:gobblin-kafka-common:processTestResources
>
> /home/spruitt/git/incubator-gobblin/gobblin-api/src/main/
> java/org/apache/gobblin/runtime/BasicTestControlMessage.java:31: warning:
> Generating equals/hashCode implementation but without a call to superclass,
> even though this class does not extend java.lang.Object. If this is
> intentional, add '@EqualsAndHashCode(callSuper=false)' to your type.
>
> @EqualsAndHashCode
>
> ^
>
> /home/spruitt/git/incubator-gobblin/gobblin-api/src/main/
> java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java:32:
> warning: Generating equals/hashCode implementation but without a call to
> superclass, even though this class does not extend java.lang.Object. If
> this is intentional, add '@EqualsAndHashCode(callSuper=false)' to your
> type.
>
> @EqualsAndHashCode
>
> ^
>
> /home/spruitt/git/incubator-gobblin/gobblin-api/src/main/
> java/org/apache/gobblin/stream/FlushControlMessage.java:31: warning:
> Generating equals/hashCode implementation but without a call to superclass,
> even though this class does not extend java.lang.Object. If this is
> intentional, add '@EqualsAndHashCode(callSuper=false)' to your type.
>
> @EqualsAndHashCode
>
> ^
>
> Note: Some input files use or override a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
> 3 warnings
>
> :gobblin-api:processResources UP-TO-DATE
>
> :gobblin-api:classes
>
> :gobblin-api:jar
>
> :gobblin-utility:compileJava
>
> :gobblin-modules:gobblin-codecs:compileJava
>
> :gobblin-modules:gobblin-codecs:classes
>
> :gobblin-modules:gobblin-codecs:jar
>
> Note: Some input files use or override a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
> :gobblin-utility:classes
>
> :gobblin-utility:jar
>
> :gobblin-metrics-libs:gobblin-metrics-base:compileJava
>
> :gobblin-test-utils:compileJavaNote: /home/spruitt/git/incubator-
> gobblin/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java
> uses or overrides a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
>
>
> :gobblin-test-utils:classes
>
> :gobblin-test-utils:jar
>
> :gobblin-config-management:gobblin-config-core:compileJava
>
> :gobblin-config-management:gobblin-config-core:classes
>
> :gobblin-config-management:gobblin-config-core:jar
>
> :gobblin-config-management:gobblin-config-client:compileJavaNote: Some
> input files use or override a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
>
>
> :gobblin-metrics-libs:gobblin-metrics-base:classes
>
> :gobblin-metrics-libs:gobblin-metrics-base:jar
>
> :gobblin-modules:gobblin-metrics-graphite:compileJava
>
> :gobblin-modules:gobblin-metrics-hadoop:compileJavaNote:
> /home/spruitt/git/incubator-gobblin/gobblin-modules/
> gobblin-metrics-graphite/src/main/java/org/apache/gobblin/
> metrics/graphite/GraphiteEventReporter.java uses or overrides a
> deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
>
>
> :gobblin-modules:gobblin-metrics-graphite:classes
>
> :gobblin-modules:gobblin-metrics-graphite:jar
>
> :gobblin-modules:gobblin-metrics-influxdb:compileJava
>
> :gobblin-modules:gobblin-metrics-hadoop:classes
>
> :gobblin-modules:gobblin-metrics-hadoop:jar
>
> :gobblin-config-management:gobblin-config-client:classes
>
> :gobblin-config-management:gobblin-config-client:jar
>
> Note: /home/spruitt/git/incubator-gobblin/gobblin-modules/
> gobblin-metrics-influxdb/src/main/java/org/apache/gobblin/
> metrics/influxdb/InfluxDBEventReporter.java uses or overrides a
> deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> :gobblin-modules:gobblin-metrics-influxdb:classes
>
> :gobblin-modules:gobblin-metrics-influxdb:jar
>
> :gobblin-metrics-libs:gobblin-metrics:compileJava
>
> :gobblin-metrics-libs:gobblin-metrics:classes
>
> :gobblin-metrics-libs:gobblin-metrics:jar
>
> :gobblin-core-base:compileJavaNote: Some input files use or override a
> deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
>
>
> :gobblin-core-base:classes
>
> :gobblin-core-base:jar
>
> :gobblin-modules:gobblin-kafka-common:compileJavaNote: Some input files
> use or override a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
>
>
> :gobblin-modules:gobblin-kafka-common:classes
>
> :gobblin-modules:gobblin-kafka-common:compileTestJavaNote:
> /home/spruitt/git/incubator-gobblin/gobblin-modules/
> gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/
> KafkaAvroEventReporterTest.java uses or overrides a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
>
>
> :gobblin-modules:gobblin-kafka-common:testClasses
>
> :gobblin-modules:gobblin-kafka-common:test
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testIdSchemaCaching STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testIdSchemaCaching PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testMaxReferences STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testMaxReferences PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testRegisterSchemaCaching STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testRegisterSchemaCaching PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testRegisterShouldCacheIds STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testRegisterShouldCacheIds PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.converter.
> EnvelopePayloadConverterTest.testConverter STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.converter.
> EnvelopePayloadConverterTest.testConverter PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.converter.
> EnvelopePayloadExtractingConverterTest. STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.converter.
> EnvelopePayloadExtractingConverterTest. PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.converter.EnvelopeSchemaConverterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.converter.EnvelopeSchemaConverterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaEventReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaEventReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaEventReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaEventReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> writer.KafkaWriterHelperTest.testSharedConfig STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> writer.KafkaWriterHelperTest.testSharedConfig PASSED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.kafka.LoggingPusherTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > 
> org.apache.gobblin.metrics.kafka.LoggingPusherTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> serialize.MD5DigestTest.testInvalidString STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> serialize.MD5DigestTest.testInvalidString PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> serialize.MD5DigestTest.testValidString STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> serialize.MD5DigestTest.testValidString PASSED
>
>
>
> BUILD SUCCESSFUL
>
>
>
> Total time: 37.075 secs
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky....@gmail.com]
> *Sent:* Wednesday, April 25, 2018 11:56 PM
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> It is nice to hear that you got it working.
>
> Could you please post the details of the error that you are seeing in the
> windows build may be someone can fix it? Also with windows you could try
> building using cygwin, not sure how you were doing it. I am personally
> using Ubuntu for development.
>
> We may require to get this change commited, can you run the tests too and
> see if these changes pass all the tests too.
>
> Lastly move to the latest Gobblin distribution, you may have to rework as
> the package names have changed after it moved to Apache incubation.
>
>
>
> Thanks,
>
> Vicky
>
>
>
>
>
> On Thu, Apr 26, 2018 at 3:48 AM, Kidwell, Jack <jack_kidwel...@comcast.com>
> wrote:
>
> I set up a fedora workstation where gobblin built the first time. It was
> failing under windows.
>
>
>
> I patched KafkaSimpleJsonExtractor.java, built the 0.10.0 distribution,
> installed it and ran with it. And it works using these job configs:
>
>
>
> source.class=gobblin.source.extractor.extract.kafka.UniversalKafkaSource
>
> gobblin.source.kafka.extractorType=KafkaSimpleJsonExtractor
>
>
>
>
>
> Can this patch be applied to the repository:
>
>
>
> diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
> b/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
>
> index 9001df134..d0813f9a3 100644
>
> --- a/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
>
> +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
>
> @@ -23,10 +23,12 @@ import java.nio.charset.StandardCharsets;
>
>
>
> import com.google.gson.Gson;
>
>
>
> +import gobblin.annotation.Alias;
>
> import gobblin.configuration.WorkUnitState;
>
> import gobblin.kafka.client.ByteArrayBasedKafkaRecord;
>
> import gobblin.source.extractor.Extractor;
>
>
>
> +@Alias("KafkaSimpleJsonExtractor")
>
> public class KafkaSimpleJsonExtractor extends KafkaSimpleExtractor
> implements Extractor<String, byte[]> {
>
>
>
>      private static final Gson gson = new Gson();
>
>
>
> *From:* Vicky Kak [mailto:vicky....@gmail.com]
> *Sent:* Thursday, April 19, 2018 11:31 PM
>
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Which script are you using to make the build, are you not using these ones
>
>
>
>    1. Skip tests and build the distribution: Run ./gradlew build -x
>    findbugsMain -x test -x rat -x checkstyleMain The distribution will be
>    created in build/gobblin-distribution/distributions directory. (or)
>    2. Run tests and build the distribution (requires Maven): Run ./gradlew
>    build The distribution will be created in 
> build/gobblin-distribution/distributions
>    directory.
>
> These are taken from  here https://github.com/
> apache/incubator-gobblin/blob/master/README.md
>
>
>
> I did not got a chance to add the Alias annotation, I am bit busy this
> weekend. I will try to take out some time and see if I can test it.
>
>
>
>
>
>
>
>
>
> On Fri, Apr 20, 2018 at 5:58 AM, Kidwell, Jack <jack_kidwel...@comcast.com>
> wrote:
>
> So far I have failed to build gobblin. I’m currently stuck on this error:
>
>
>
> FAILURE: Build failed with an exception.
>
>
>
> * Where:
>
> Script 'incubator-gobblin/gradle/scripts/idesSetup.gradle' line: 32
>
>
>
> * What went wrong:
>
> A problem occurred evaluating script.
>
> > Failed to apply plugin [id 'org.gradle.java']
>
>    > Value is null
>
>
>
>
>
> Did you have a chance to research adding annotation, @Alias
> (“KafkaSimpleJsonExtractor”)?
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky....@gmail.com]
> *Sent:* Saturday, April 14, 2018 12:19 AM
>
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Yes you seem to correct, the code never lies :)
>
>
>
> Can you try implementing your customised Extractor which will extend
> KafkaSimpleJsonExtractor and have the Alias annotation too?
>
>
>
> The other option would be to add the Alias annotation to the
> KafkaSimpleJsonExtractor, I need to see what implications it will have and
> why this was not done earlier. I require some time to look deeper into it,
> hope will take some time during the weekend.
>
>
>
>
>
>
>
>
>
> On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <jack_kidwel...@comcast.com>
> wrote:
>
> Our correspondence helps with understanding Gobblin. J
>
>
>
> We’re focusing on the source, so no writers are involved yet. We want to
> extract both the key and value from each kafka message.
>
>
>
> There are three classes that extend abstract class, KafkaSource:
>
> KafkaDeserializerSource,
>
> KafkaSimpleSource and
>
> UniversalKafkaSource.
>
>
>
> Each has method, getExtractor(), that returns these respective instances:
>
>
>
> KafkaDeserializerExtractor;
>
> KafkaSimpleExtractor and
>
> a class found by ClassAliasResolver. resolveClass.
>
>
>
> The first two have method, decodeRecord(), that takes parameter,
> ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord
> .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(),
> so these sources won’t work.
>
>
>
> The UniversalKafkaSource class looks interesting because it will search
> for a class that extends KafkaExtractor and instantiate it. But there’s a
> catch: the class must be annotated with @Alias. If class
> KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use
> the UniversalKafaSource along with property, 
> gobblin.source.kafka.extractorType.
> As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both
> ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. 
> getMessageBytes(),
> these result we seek.
>
>
>
> Or did we miss something?
>
>
>
> *From:* Vicky Kak [mailto:vicky....@gmail.com]
> *Sent:* Friday, April 13, 2018 12:01 AM
>
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Are you not having this configuration?
>
> ***********************************************************************
>
> writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
>
> writer.file.path.type=tablename
>
> writer.destination.type=HDFS
>
> writer.output.format=txt
>
> ***********************************************************************
>
>
> I scanned the code for the SimpleDataWriterBuilder which does uses the
> SimpleDataWriter that writes the byte array to the HDFS( I don't see the
> hdfs configuration so assume
>
> that the file is only in hdfs format and it would be copied to hadoop file
> system after it is created).
>
>
>
>
>
> It seems that you want to store the key/value in the Json format ( Or any
> other format) and not the way it is being done by the default
> SimpleDataWriter. You may look for
>
> configuring the KafkaDeserializerExtrator with required deserialized type
> as Json( or other).
>
>
>
> Once this is done then you can use KafkaSimpleJsonExtractor which was
> asked at the beginning.
>
>
>
> So to conclude what I understand is that you don't want the data to be
> stored in the byte array, you require json (or other format).
>
>
>
>
>
>
>
>
>
>
>
> On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <jack_kidwel...@comcast.com>
> wrote:
>
> Thank you for your time spent on our question.
>
>
>
> This quick start document,
>
> https://gobblin.readthedocs.io/en/latest/case-studies/
> Kafka-HDFS-Ingestion/,
>
> shows 
> “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”.
> That class overrides method, getExtractor, to return “new
> KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides
> method, decodeRecord, to return just the Kafka message value.
>
>
>
>   return kafkaConsumerRecord.getMessageBytes()
>
>
>
> Since we want both key and value stored in HDFS, class,
> KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord
> method:
>
>
>
> protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset)
> throws IOException {
>
>         long offset = messageAndOffset.getOffset();
>
>
>
>         byte[] keyBytes = messageAndOffset.getKeyBytes();
>
>         String key = (keyBytes == null) ? "" : new String(keyBytes,
> CHARSET);
>
>
>
>         byte[] payloadBytes = messageAndOffset.getMessageBytes();
>
>         String payload = (payloadBytes == null) ? "" : new
> String(payloadBytes, CHARSET);
>
>
>
>         KafkaRecord record = new KafkaRecord(offset, key, payload);
>
>
>
>         byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
>
>         return decodedRecord;
>
>     }
>
>
>
> The architecture diagram leads us to think that the exactor is the point
> where both Kafka key and value are visible in the Gobblin pipeline:
>
> https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-
> constructs
>
>
>
> We are eager to learn from you.
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky....@gmail.com]
> *Sent:* Thursday, April 12, 2018 9:45 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> For writing the data to the HDFS you need to take a look at the Writer
> Implementation, please look for it. The Extractor is used in pulling the
> data which is thereafter passed through various stages before it is stored
> via the Writer.
>
> What I mean by the other subscriber was to add one more consumer which
> will  read the key/value e.g kafka-console-consumer.sh which comes with
> the kafka.
>
> I was not earlier aware that you wanted to store the data in HFDS as you
> had given the reference of the Extractor so it was not clear to me.
>
> Please take a look at the Kafka to HDFS writer and see if that helps, in
> case it is not doing what is exactly required by you then you may have to
> plugin the customized writer.
>
>
>
>
>
> On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <jack_kidwel...@comcast.com>
> wrote:
>
> We want to extract both key and value from a Kafka message and publish the
> combined information to HDFS. Keys contain numeric ids for the strings
> contained in values.
>
>
>
> Please explain “other subscriber”. Are you proposing a different Kafka
> message structure?
>
>
>
> KafkaSimpleJsonExtractor.java caught my attention because it extracts both
> keys and values and puts them in decodedRecord.
>
>
>
> *From:* Vicky Kak [mailto:vicky....@gmail.com]
> *Sent:* Thursday, April 12, 2018 1:17 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> I am not sure what you are asking for. Do you want to see the keys/values
> rendered in the logs while the extraction is being done?
>
> Why can't you have the other subscriber to the kafka which will render the
> values rather than KafkaExtractor implementation rendering the same?
>
>
>
>
>
> On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <
> jack_kidwel...@comcast.com> wrote:
>
> Hi,
>
>
>
> Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java
> in order to see both kafka record keys and values.
>
>
>
> How does one configure a job to achieve it?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to