So far I have failed to build gobblin. I’m currently stuck on this error:
FAILURE: Build failed with an exception. * Where: Script 'incubator-gobblin/gradle/scripts/idesSetup.gradle' line: 32 * What went wrong: A problem occurred evaluating script. > Failed to apply plugin [id 'org.gradle.java'] > Value is null Did you have a chance to research adding annotation, @Alias (“KafkaSimpleJsonExtractor”)? From: Vicky Kak [mailto:vicky....@gmail.com] Sent: Saturday, April 14, 2018 12:19 AM To: user@gobblin.incubator.apache.org Subject: Re: KafkaSimpleJsonExtractor Yes you seem to correct, the code never lies :) Can you try implementing your customised Extractor which will extend KafkaSimpleJsonExtractor and have the Alias annotation too? The other option would be to add the Alias annotation to the KafkaSimpleJsonExtractor, I need to see what implications it will have and why this was not done earlier. I require some time to look deeper into it, hope will take some time during the weekend. On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <jack_kidwel...@comcast.com<mailto:jack_kidwel...@comcast.com>> wrote: Our correspondence helps with understanding Gobblin. ☺ We’re focusing on the source, so no writers are involved yet. We want to extract both the key and value from each kafka message. There are three classes that extend abstract class, KafkaSource: KafkaDeserializerSource, KafkaSimpleSource and UniversalKafkaSource. Each has method, getExtractor(), that returns these respective instances: KafkaDeserializerExtractor; KafkaSimpleExtractor and a class found by ClassAliasResolver. resolveClass. The first two have method, decodeRecord(), that takes parameter, ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(), so these sources won’t work. The UniversalKafkaSource class looks interesting because it will search for a class that extends KafkaExtractor and instantiate it. But there’s a catch: the class must be annotated with @Alias. If class KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use the UniversalKafaSource along with property, gobblin.source.kafka.extractorType. As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. getMessageBytes(), these result we seek. Or did we miss something? From: Vicky Kak [mailto:vicky....@gmail.com<mailto:vicky....@gmail.com>] Sent: Friday, April 13, 2018 12:01 AM To: user@gobblin.incubator.apache.org<mailto:user@gobblin.incubator.apache.org> Subject: Re: KafkaSimpleJsonExtractor Are you not having this configuration? *********************************************************************** writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder writer.file.path.type=tablename writer.destination.type=HDFS writer.output.format=txt *********************************************************************** I scanned the code for the SimpleDataWriterBuilder which does uses the SimpleDataWriter that writes the byte array to the HDFS( I don't see the hdfs configuration so assume that the file is only in hdfs format and it would be copied to hadoop file system after it is created). It seems that you want to store the key/value in the Json format ( Or any other format) and not the way it is being done by the default SimpleDataWriter. You may look for configuring the KafkaDeserializerExtrator with required deserialized type as Json( or other). Once this is done then you can use KafkaSimpleJsonExtractor which was asked at the beginning. So to conclude what I understand is that you don't want the data to be stored in the byte array, you require json (or other format). On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <jack_kidwel...@comcast.com<mailto:jack_kidwel...@comcast.com>> wrote: Thank you for your time spent on our question. This quick start document, https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/, shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”. That class overrides method, getExtractor, to return “new KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides method, decodeRecord, to return just the Kafka message value. return kafkaConsumerRecord.getMessageBytes() Since we want both key and value stored in HDFS, class, KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord method: protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset) throws IOException { long offset = messageAndOffset.getOffset(); byte[] keyBytes = messageAndOffset.getKeyBytes(); String key = (keyBytes == null) ? "" : new String(keyBytes, CHARSET); byte[] payloadBytes = messageAndOffset.getMessageBytes(); String payload = (payloadBytes == null) ? "" : new String(payloadBytes, CHARSET); KafkaRecord record = new KafkaRecord(offset, key, payload); byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET); return decodedRecord; } The architecture diagram leads us to think that the exactor is the point where both Kafka key and value are visible in the Gobblin pipeline: https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-constructs We are eager to learn from you. From: Vicky Kak [mailto:vicky....@gmail.com<mailto:vicky....@gmail.com>] Sent: Thursday, April 12, 2018 9:45 AM To: user@gobblin.incubator.apache.org<mailto:user@gobblin.incubator.apache.org> Subject: Re: KafkaSimpleJsonExtractor For writing the data to the HDFS you need to take a look at the Writer Implementation, please look for it. The Extractor is used in pulling the data which is thereafter passed through various stages before it is stored via the Writer. What I mean by the other subscriber was to add one more consumer which will read the key/value e.g kafka-console-consumer.sh which comes with the kafka. I was not earlier aware that you wanted to store the data in HFDS as you had given the reference of the Extractor so it was not clear to me. Please take a look at the Kafka to HDFS writer and see if that helps, in case it is not doing what is exactly required by you then you may have to plugin the customized writer. On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <jack_kidwel...@comcast.com<mailto:jack_kidwel...@comcast.com>> wrote: We want to extract both key and value from a Kafka message and publish the combined information to HDFS. Keys contain numeric ids for the strings contained in values. Please explain “other subscriber”. Are you proposing a different Kafka message structure? KafkaSimpleJsonExtractor.java caught my attention because it extracts both keys and values and puts them in decodedRecord. From: Vicky Kak [mailto:vicky....@gmail.com<mailto:vicky....@gmail.com>] Sent: Thursday, April 12, 2018 1:17 AM To: user@gobblin.incubator.apache.org<mailto:user@gobblin.incubator.apache.org> Subject: Re: KafkaSimpleJsonExtractor I am not sure what you are asking for. Do you want to see the keys/values rendered in the logs while the extraction is being done? Why can't you have the other subscriber to the kafka which will render the values rather than KafkaExtractor implementation rendering the same? On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <jack_kidwel...@comcast.com<mailto:jack_kidwel...@comcast.com>> wrote: Hi, Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java in order to see both kafka record keys and values. How does one configure a job to achieve it?