Great, this looks fine to me. Can you raise a JIRA with the issue description and there after raise a PR for this change. Thanks, Vicky
On Sat, Apr 28, 2018 at 3:22 AM, Kidwell, Jack <jack_kidwel...@comcast.com> wrote: > I wiped my windows directory, but because I was using tags/gobblin_0.10.0 > it’s pointless to troubleshoot it. > > > > On my fedora workstation, I’m using tags/release-0.12.0-rc2, and patched > > gobblin-modules/gobblin-kafka-common/src/main/java/org/ > apache/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java. > > > > > The tests pass. How do I get this trivial change in? Do I make branch and > a pull request? > > > > Here’s the output: > > > > ./gradlew :gobblin-modules:gobblin-kafka-common:test > > Parallel execution with configuration on demand is an incubating feature. > > :buildSrc:compileJava UP-TO-DATE > > :buildSrc:compileGroovy UP-TO-DATE > > :buildSrc:processResources UP-TO-DATE > > :buildSrc:classes UP-TO-DATE > > :buildSrc:jar UP-TO-DATE > > :buildSrc:assemble UP-TO-DATE > > :buildSrc:compileTestJava UP-TO-DATE > > :buildSrc:compileTestGroovy UP-TO-DATE > > :buildSrc:processTestResources UP-TO-DATE > > :buildSrc:testClasses UP-TO-DATE > > :buildSrc:test UP-TO-DATE > > :buildSrc:check UP-TO-DATE > > :buildSrc:build UP-TO-DATE > > Build property: gobblinFlavor=standard > > Build property: jdkVersion=1.8 > > Build property: sonatypeArtifactRepository=htt > ps://oss.sonatype.org/service/local/staging/deploy/maven2/ > > Build property: sonatypeArtifactSnapshotRepository= > https://oss.sonatype.org/content/repositories/snapshots/ > > Build property: nexusArtifactRepository=https://repository.apache.org/ > service/local/staging/deploy/maven2 > > Build property: nexusArtifactSnapshotRepository=https://repository.apache. > org/content/repositories/snapshots > > Build property: doNotSignArtifacts=false > > Build property: avroVersion=1.8.1 > > Build property: awsVersion=1.11.8 > > Build property: bytemanVersion=2.2.1 > > Build property: confluentVersion=2.0.1 > > Build property: hadoopVersion=2.3.0 > > Build property: hiveVersion=1.0.1 > > Build property: kafka08Version=0.8.2.2 > > Build property: kafka09Version=0.9.0.1 > > Build property: pegasusVersion=11.0.0 > > Build property: salesforceVersion=37.0.3 > > Detected Gradle version major=2 minor=13 > > Build property: publishToMaven=false > > Build property: publishToNexus=false > > Release Version: 0.12.0 > > :gobblin-api:compileJava > > :gobblin-utility:processResources > > :gobblin-metrics-libs:gobblin-metrics-base:generateAvro UP-TO-DATE > > :gobblin-metrics-libs:gobblin-metrics-base:processResources > > :gobblin-modules:gobblin-metrics-graphite:processResources UP-TO-DATE > > :gobblin-modules:gobblin-metrics-hadoop:processResources UP-TO-DATE > > :gobblin-modules:gobblin-metrics-influxdb:processResources UP-TO-DATE > > :gobblin-metrics-libs:gobblin-metrics:processResources UP-TO-DATE > > :gobblin-modules:gobblin-codecs:processResources UP-TO-DATE > > :gobblin-core-base:processResources UP-TO-DATE > > :gobblin-test-utils:processResources > > :gobblin-config-management:gobblin-config-core:processResources > > :gobblin-config-management:gobblin-config-client:processResources > UP-TO-DATE > > :gobblin-modules:gobblin-kafka-common:processResources UP-TO-DATE > > :gobblin-modules:gobblin-kafka-common:processTestResources > > /home/spruitt/git/incubator-gobblin/gobblin-api/src/main/ > java/org/apache/gobblin/runtime/BasicTestControlMessage.java:31: warning: > Generating equals/hashCode implementation but without a call to superclass, > even though this class does not extend java.lang.Object. If this is > intentional, add '@EqualsAndHashCode(callSuper=false)' to your type. > > @EqualsAndHashCode > > ^ > > /home/spruitt/git/incubator-gobblin/gobblin-api/src/main/ > java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java:32: > warning: Generating equals/hashCode implementation but without a call to > superclass, even though this class does not extend java.lang.Object. If > this is intentional, add '@EqualsAndHashCode(callSuper=false)' to your > type. > > @EqualsAndHashCode > > ^ > > /home/spruitt/git/incubator-gobblin/gobblin-api/src/main/ > java/org/apache/gobblin/stream/FlushControlMessage.java:31: warning: > Generating equals/hashCode implementation but without a call to superclass, > even though this class does not extend java.lang.Object. If this is > intentional, add '@EqualsAndHashCode(callSuper=false)' to your type. > > @EqualsAndHashCode > > ^ > > Note: Some input files use or override a deprecated API. > > Note: Recompile with -Xlint:deprecation for details. > > Note: Some input files use unchecked or unsafe operations. > > Note: Recompile with -Xlint:unchecked for details. > > 3 warnings > > :gobblin-api:processResources UP-TO-DATE > > :gobblin-api:classes > > :gobblin-api:jar > > :gobblin-utility:compileJava > > :gobblin-modules:gobblin-codecs:compileJava > > :gobblin-modules:gobblin-codecs:classes > > :gobblin-modules:gobblin-codecs:jar > > Note: Some input files use or override a deprecated API. > > Note: Recompile with -Xlint:deprecation for details. > > Note: Some input files use unchecked or unsafe operations. > > Note: Recompile with -Xlint:unchecked for details. > > :gobblin-utility:classes > > :gobblin-utility:jar > > :gobblin-metrics-libs:gobblin-metrics-base:compileJava > > :gobblin-test-utils:compileJavaNote: /home/spruitt/git/incubator- > gobblin/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java > uses or overrides a deprecated API. > > Note: Recompile with -Xlint:deprecation for details. > > > > :gobblin-test-utils:classes > > :gobblin-test-utils:jar > > :gobblin-config-management:gobblin-config-core:compileJava > > :gobblin-config-management:gobblin-config-core:classes > > :gobblin-config-management:gobblin-config-core:jar > > :gobblin-config-management:gobblin-config-client:compileJavaNote: Some > input files use or override a deprecated API. > > Note: Recompile with -Xlint:deprecation for details. > > Note: Some input files use unchecked or unsafe operations. > > Note: Recompile with -Xlint:unchecked for details. > > > > :gobblin-metrics-libs:gobblin-metrics-base:classes > > :gobblin-metrics-libs:gobblin-metrics-base:jar > > :gobblin-modules:gobblin-metrics-graphite:compileJava > > :gobblin-modules:gobblin-metrics-hadoop:compileJavaNote: > /home/spruitt/git/incubator-gobblin/gobblin-modules/ > gobblin-metrics-graphite/src/main/java/org/apache/gobblin/ > metrics/graphite/GraphiteEventReporter.java uses or overrides a > deprecated API. > > Note: Recompile with -Xlint:deprecation for details. > > > > :gobblin-modules:gobblin-metrics-graphite:classes > > :gobblin-modules:gobblin-metrics-graphite:jar > > :gobblin-modules:gobblin-metrics-influxdb:compileJava > > :gobblin-modules:gobblin-metrics-hadoop:classes > > :gobblin-modules:gobblin-metrics-hadoop:jar > > :gobblin-config-management:gobblin-config-client:classes > > :gobblin-config-management:gobblin-config-client:jar > > Note: /home/spruitt/git/incubator-gobblin/gobblin-modules/ > gobblin-metrics-influxdb/src/main/java/org/apache/gobblin/ > metrics/influxdb/InfluxDBEventReporter.java uses or overrides a > deprecated API. > > Note: Recompile with -Xlint:deprecation for details. > > :gobblin-modules:gobblin-metrics-influxdb:classes > > :gobblin-modules:gobblin-metrics-influxdb:jar > > :gobblin-metrics-libs:gobblin-metrics:compileJava > > :gobblin-metrics-libs:gobblin-metrics:classes > > :gobblin-metrics-libs:gobblin-metrics:jar > > :gobblin-core-base:compileJavaNote: Some input files use or override a > deprecated API. > > Note: Recompile with -Xlint:deprecation for details. > > Note: Some input files use unchecked or unsafe operations. > > Note: Recompile with -Xlint:unchecked for details. > > > > :gobblin-core-base:classes > > :gobblin-core-base:jar > > :gobblin-modules:gobblin-kafka-common:compileJavaNote: Some input files > use or override a deprecated API. > > Note: Recompile with -Xlint:deprecation for details. > > Note: Some input files use unchecked or unsafe operations. > > Note: Recompile with -Xlint:unchecked for details. > > > > :gobblin-modules:gobblin-kafka-common:classes > > :gobblin-modules:gobblin-kafka-common:compileTestJavaNote: > /home/spruitt/git/incubator-gobblin/gobblin-modules/ > gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/ > KafkaAvroEventReporterTest.java uses or overrides a deprecated API. > > Note: Recompile with -Xlint:deprecation for details. > > Note: Some input files use unchecked or unsafe operations. > > Note: Recompile with -Xlint:unchecked for details. > > > > :gobblin-modules:gobblin-kafka-common:testClasses > > :gobblin-modules:gobblin-kafka-common:test > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg. > CachingKafkaSchemaRegistryTest.testIdSchemaCaching STARTED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg. > CachingKafkaSchemaRegistryTest.testIdSchemaCaching PASSED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg. > CachingKafkaSchemaRegistryTest.testMaxReferences STARTED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg. > CachingKafkaSchemaRegistryTest.testMaxReferences PASSED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg. > CachingKafkaSchemaRegistryTest.testRegisterSchemaCaching STARTED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg. > CachingKafkaSchemaRegistryTest.testRegisterSchemaCaching PASSED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg. > CachingKafkaSchemaRegistryTest.testRegisterShouldCacheIds STARTED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg. > CachingKafkaSchemaRegistryTest.testRegisterShouldCacheIds PASSED > > > > Gradle suite > Gradle test > org.apache.gobblin.converter. > EnvelopePayloadConverterTest.testConverter STARTED > > > > Gradle suite > Gradle test > org.apache.gobblin.converter. > EnvelopePayloadConverterTest.testConverter PASSED > > > > Gradle suite > Gradle test > org.apache.gobblin.converter. > EnvelopePayloadExtractingConverterTest. STARTED > > > > Gradle suite > Gradle test > org.apache.gobblin.converter. > EnvelopePayloadExtractingConverterTest. PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.converter.EnvelopeSchemaConverterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.converter.EnvelopeSchemaConverterTest. > PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. > PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. > PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. > PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. > PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. > PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. > PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. > PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaReporterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaReporterTest. > PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaReporterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaReporterTest. > PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaReporterTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.reporter.KafkaReporterTest. > PASSED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka. > writer.KafkaWriterHelperTest.testSharedConfig STARTED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka. > writer.KafkaWriterHelperTest.testSharedConfig PASSED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.kafka.LoggingPusherTest. > STARTED > > > > Gradle suite > Gradle test > > org.apache.gobblin.metrics.kafka.LoggingPusherTest. > PASSED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka. > serialize.MD5DigestTest.testInvalidString STARTED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka. > serialize.MD5DigestTest.testInvalidString PASSED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka. > serialize.MD5DigestTest.testValidString STARTED > > > > Gradle suite > Gradle test > org.apache.gobblin.kafka. > serialize.MD5DigestTest.testValidString PASSED > > > > BUILD SUCCESSFUL > > > > Total time: 37.075 secs > > > > > > *From:* Vicky Kak [mailto:vicky....@gmail.com] > *Sent:* Wednesday, April 25, 2018 11:56 PM > > *To:* user@gobblin.incubator.apache.org > *Subject:* Re: KafkaSimpleJsonExtractor > > > > It is nice to hear that you got it working. > > Could you please post the details of the error that you are seeing in the > windows build may be someone can fix it? Also with windows you could try > building using cygwin, not sure how you were doing it. I am personally > using Ubuntu for development. > > We may require to get this change commited, can you run the tests too and > see if these changes pass all the tests too. > > Lastly move to the latest Gobblin distribution, you may have to rework as > the package names have changed after it moved to Apache incubation. > > > > Thanks, > > Vicky > > > > > > On Thu, Apr 26, 2018 at 3:48 AM, Kidwell, Jack <jack_kidwel...@comcast.com> > wrote: > > I set up a fedora workstation where gobblin built the first time. It was > failing under windows. > > > > I patched KafkaSimpleJsonExtractor.java, built the 0.10.0 distribution, > installed it and ran with it. And it works using these job configs: > > > > source.class=gobblin.source.extractor.extract.kafka.UniversalKafkaSource > > gobblin.source.kafka.extractorType=KafkaSimpleJsonExtractor > > > > > > Can this patch be applied to the repository: > > > > diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/ > gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java > b/gobblin-modules/gobblin-kafka-common/src/main/java/ > gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java > > index 9001df134..d0813f9a3 100644 > > --- a/gobblin-modules/gobblin-kafka-common/src/main/java/ > gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java > > +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/ > gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java > > @@ -23,10 +23,12 @@ import java.nio.charset.StandardCharsets; > > > > import com.google.gson.Gson; > > > > +import gobblin.annotation.Alias; > > import gobblin.configuration.WorkUnitState; > > import gobblin.kafka.client.ByteArrayBasedKafkaRecord; > > import gobblin.source.extractor.Extractor; > > > > +@Alias("KafkaSimpleJsonExtractor") > > public class KafkaSimpleJsonExtractor extends KafkaSimpleExtractor > implements Extractor<String, byte[]> { > > > > private static final Gson gson = new Gson(); > > > > *From:* Vicky Kak [mailto:vicky....@gmail.com] > *Sent:* Thursday, April 19, 2018 11:31 PM > > > *To:* user@gobblin.incubator.apache.org > *Subject:* Re: KafkaSimpleJsonExtractor > > > > Which script are you using to make the build, are you not using these ones > > > > 1. Skip tests and build the distribution: Run ./gradlew build -x > findbugsMain -x test -x rat -x checkstyleMain The distribution will be > created in build/gobblin-distribution/distributions directory. (or) > 2. Run tests and build the distribution (requires Maven): Run ./gradlew > build The distribution will be created in > build/gobblin-distribution/distributions > directory. > > These are taken from here https://github.com/ > apache/incubator-gobblin/blob/master/README.md > > > > I did not got a chance to add the Alias annotation, I am bit busy this > weekend. I will try to take out some time and see if I can test it. > > > > > > > > > > On Fri, Apr 20, 2018 at 5:58 AM, Kidwell, Jack <jack_kidwel...@comcast.com> > wrote: > > So far I have failed to build gobblin. I’m currently stuck on this error: > > > > FAILURE: Build failed with an exception. > > > > * Where: > > Script 'incubator-gobblin/gradle/scripts/idesSetup.gradle' line: 32 > > > > * What went wrong: > > A problem occurred evaluating script. > > > Failed to apply plugin [id 'org.gradle.java'] > > > Value is null > > > > > > Did you have a chance to research adding annotation, @Alias > (“KafkaSimpleJsonExtractor”)? > > > > > > *From:* Vicky Kak [mailto:vicky....@gmail.com] > *Sent:* Saturday, April 14, 2018 12:19 AM > > > *To:* user@gobblin.incubator.apache.org > *Subject:* Re: KafkaSimpleJsonExtractor > > > > Yes you seem to correct, the code never lies :) > > > > Can you try implementing your customised Extractor which will extend > KafkaSimpleJsonExtractor and have the Alias annotation too? > > > > The other option would be to add the Alias annotation to the > KafkaSimpleJsonExtractor, I need to see what implications it will have and > why this was not done earlier. I require some time to look deeper into it, > hope will take some time during the weekend. > > > > > > > > > > On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <jack_kidwel...@comcast.com> > wrote: > > Our correspondence helps with understanding Gobblin. J > > > > We’re focusing on the source, so no writers are involved yet. We want to > extract both the key and value from each kafka message. > > > > There are three classes that extend abstract class, KafkaSource: > > KafkaDeserializerSource, > > KafkaSimpleSource and > > UniversalKafkaSource. > > > > Each has method, getExtractor(), that returns these respective instances: > > > > KafkaDeserializerExtractor; > > KafkaSimpleExtractor and > > a class found by ClassAliasResolver. resolveClass. > > > > The first two have method, decodeRecord(), that takes parameter, > ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord > .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(), > so these sources won’t work. > > > > The UniversalKafkaSource class looks interesting because it will search > for a class that extends KafkaExtractor and instantiate it. But there’s a > catch: the class must be annotated with @Alias. If class > KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use > the UniversalKafaSource along with property, > gobblin.source.kafka.extractorType. > As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both > ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. > getMessageBytes(), > these result we seek. > > > > Or did we miss something? > > > > *From:* Vicky Kak [mailto:vicky....@gmail.com] > *Sent:* Friday, April 13, 2018 12:01 AM > > > *To:* user@gobblin.incubator.apache.org > *Subject:* Re: KafkaSimpleJsonExtractor > > > > Are you not having this configuration? > > *********************************************************************** > > writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder > > writer.file.path.type=tablename > > writer.destination.type=HDFS > > writer.output.format=txt > > *********************************************************************** > > > I scanned the code for the SimpleDataWriterBuilder which does uses the > SimpleDataWriter that writes the byte array to the HDFS( I don't see the > hdfs configuration so assume > > that the file is only in hdfs format and it would be copied to hadoop file > system after it is created). > > > > > > It seems that you want to store the key/value in the Json format ( Or any > other format) and not the way it is being done by the default > SimpleDataWriter. You may look for > > configuring the KafkaDeserializerExtrator with required deserialized type > as Json( or other). > > > > Once this is done then you can use KafkaSimpleJsonExtractor which was > asked at the beginning. > > > > So to conclude what I understand is that you don't want the data to be > stored in the byte array, you require json (or other format). > > > > > > > > > > > > On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <jack_kidwel...@comcast.com> > wrote: > > Thank you for your time spent on our question. > > > > This quick start document, > > https://gobblin.readthedocs.io/en/latest/case-studies/ > Kafka-HDFS-Ingestion/, > > shows > “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”. > That class overrides method, getExtractor, to return “new > KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides > method, decodeRecord, to return just the Kafka message value. > > > > return kafkaConsumerRecord.getMessageBytes() > > > > Since we want both key and value stored in HDFS, class, > KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord > method: > > > > protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset) > throws IOException { > > long offset = messageAndOffset.getOffset(); > > > > byte[] keyBytes = messageAndOffset.getKeyBytes(); > > String key = (keyBytes == null) ? "" : new String(keyBytes, > CHARSET); > > > > byte[] payloadBytes = messageAndOffset.getMessageBytes(); > > String payload = (payloadBytes == null) ? "" : new > String(payloadBytes, CHARSET); > > > > KafkaRecord record = new KafkaRecord(offset, key, payload); > > > > byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET); > > return decodedRecord; > > } > > > > The architecture diagram leads us to think that the exactor is the point > where both Kafka key and value are visible in the Gobblin pipeline: > > https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin- > constructs > > > > We are eager to learn from you. > > > > > > *From:* Vicky Kak [mailto:vicky....@gmail.com] > *Sent:* Thursday, April 12, 2018 9:45 AM > *To:* user@gobblin.incubator.apache.org > *Subject:* Re: KafkaSimpleJsonExtractor > > > > For writing the data to the HDFS you need to take a look at the Writer > Implementation, please look for it. The Extractor is used in pulling the > data which is thereafter passed through various stages before it is stored > via the Writer. > > What I mean by the other subscriber was to add one more consumer which > will read the key/value e.g kafka-console-consumer.sh which comes with > the kafka. > > I was not earlier aware that you wanted to store the data in HFDS as you > had given the reference of the Extractor so it was not clear to me. > > Please take a look at the Kafka to HDFS writer and see if that helps, in > case it is not doing what is exactly required by you then you may have to > plugin the customized writer. > > > > > > On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <jack_kidwel...@comcast.com> > wrote: > > We want to extract both key and value from a Kafka message and publish the > combined information to HDFS. Keys contain numeric ids for the strings > contained in values. > > > > Please explain “other subscriber”. Are you proposing a different Kafka > message structure? > > > > KafkaSimpleJsonExtractor.java caught my attention because it extracts both > keys and values and puts them in decodedRecord. > > > > *From:* Vicky Kak [mailto:vicky....@gmail.com] > *Sent:* Thursday, April 12, 2018 1:17 AM > *To:* user@gobblin.incubator.apache.org > *Subject:* Re: KafkaSimpleJsonExtractor > > > > I am not sure what you are asking for. Do you want to see the keys/values > rendered in the logs while the extraction is being done? > > Why can't you have the other subscriber to the kafka which will render the > values rather than KafkaExtractor implementation rendering the same? > > > > > > On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack < > jack_kidwel...@comcast.com> wrote: > > Hi, > > > > Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java > in order to see both kafka record keys and values. > > > > How does one configure a job to achieve it? > > > > > > > > > > > > > > >