Yes you seem to correct, the code never lies :)

Can you try implementing your customised Extractor which will extend
KafkaSimpleJsonExtractor
and have the Alias annotation too?

The other option would be to add the Alias annotation to the
KafkaSimpleJsonExtractor, I need to see what implications it will have and
why this was not done earlier. I require some time to look deeper into it,
hope will take some time during the weekend.




On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <jack_kidwel...@comcast.com>
wrote:

> Our correspondence helps with understanding Gobblin. J
>
>
>
> We’re focusing on the source, so no writers are involved yet. We want to
> extract both the key and value from each kafka message.
>
>
>
> There are three classes that extend abstract class, KafkaSource:
>
> KafkaDeserializerSource,
>
> KafkaSimpleSource and
>
> UniversalKafkaSource.
>
>
>
> Each has method, getExtractor(), that returns these respective instances:
>
>
>
> KafkaDeserializerExtractor;
>
> KafkaSimpleExtractor and
>
> a class found by ClassAliasResolver. resolveClass.
>
>
>
> The first two have method, decodeRecord(), that takes parameter,
> ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord
> .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(),
> so these sources won’t work.
>
>
>
> The UniversalKafkaSource class looks interesting because it will search
> for a class that extends KafkaExtractor and instantiate it. But there’s a
> catch: the class must be annotated with @Alias. If class
> KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use
> the UniversalKafaSource along with property, 
> gobblin.source.kafka.extractorType.
> As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both
> ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. 
> getMessageBytes(),
> these result we seek.
>
>
>
> Or did we miss something?
>
>
>
> *From:* Vicky Kak [mailto:vicky....@gmail.com]
> *Sent:* Friday, April 13, 2018 12:01 AM
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Are you not having this configuration?
>
> ***********************************************************************
>
> writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
>
> writer.file.path.type=tablename
>
> writer.destination.type=HDFS
>
> writer.output.format=txt
>
> ***********************************************************************
>
>
> I scanned the code for the SimpleDataWriterBuilder which does uses the
> SimpleDataWriter that writes the byte array to the HDFS( I don't see the
> hdfs configuration so assume
>
> that the file is only in hdfs format and it would be copied to hadoop file
> system after it is created).
>
>
>
>
>
> It seems that you want to store the key/value in the Json format ( Or any
> other format) and not the way it is being done by the default
> SimpleDataWriter. You may look for
>
> configuring the KafkaDeserializerExtrator with required deserialized type
> as Json( or other).
>
>
>
> Once this is done then you can use KafkaSimpleJsonExtractor which was
> asked at the beginning.
>
>
>
> So to conclude what I understand is that you don't want the data to be
> stored in the byte array, you require json (or other format).
>
>
>
>
>
>
>
>
>
>
>
> On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <jack_kidwel...@comcast.com>
> wrote:
>
> Thank you for your time spent on our question.
>
>
>
> This quick start document,
>
> https://gobblin.readthedocs.io/en/latest/case-studies/
> Kafka-HDFS-Ingestion/,
>
> shows 
> “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”.
> That class overrides method, getExtractor, to return “new
> KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides
> method, decodeRecord, to return just the Kafka message value.
>
>
>
>   return kafkaConsumerRecord.getMessageBytes()
>
>
>
> Since we want both key and value stored in HDFS, class,
> KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord
> method:
>
>
>
> protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset)
> throws IOException {
>
>         long offset = messageAndOffset.getOffset();
>
>
>
>         byte[] keyBytes = messageAndOffset.getKeyBytes();
>
>         String key = (keyBytes == null) ? "" : new String(keyBytes,
> CHARSET);
>
>
>
>         byte[] payloadBytes = messageAndOffset.getMessageBytes();
>
>         String payload = (payloadBytes == null) ? "" : new
> String(payloadBytes, CHARSET);
>
>
>
>         KafkaRecord record = new KafkaRecord(offset, key, payload);
>
>
>
>         byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
>
>         return decodedRecord;
>
>     }
>
>
>
> The architecture diagram leads us to think that the exactor is the point
> where both Kafka key and value are visible in the Gobblin pipeline:
>
> https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-
> constructs
>
>
>
> We are eager to learn from you.
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky....@gmail.com]
> *Sent:* Thursday, April 12, 2018 9:45 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> For writing the data to the HDFS you need to take a look at the Writer
> Implementation, please look for it. The Extractor is used in pulling the
> data which is thereafter passed through various stages before it is stored
> via the Writer.
>
> What I mean by the other subscriber was to add one more consumer which
> will  read the key/value e.g kafka-console-consumer.sh which comes with
> the kafka.
>
> I was not earlier aware that you wanted to store the data in HFDS as you
> had given the reference of the Extractor so it was not clear to me.
>
> Please take a look at the Kafka to HDFS writer and see if that helps, in
> case it is not doing what is exactly required by you then you may have to
> plugin the customized writer.
>
>
>
>
>
> On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <jack_kidwel...@comcast.com>
> wrote:
>
> We want to extract both key and value from a Kafka message and publish the
> combined information to HDFS. Keys contain numeric ids for the strings
> contained in values.
>
>
>
> Please explain “other subscriber”. Are you proposing a different Kafka
> message structure?
>
>
>
> KafkaSimpleJsonExtractor.java caught my attention because it extracts both
> keys and values and puts them in decodedRecord.
>
>
>
> *From:* Vicky Kak [mailto:vicky....@gmail.com]
> *Sent:* Thursday, April 12, 2018 1:17 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> I am not sure what you are asking for. Do you want to see the keys/values
> rendered in the logs while the extraction is being done?
>
> Why can't you have the other subscriber to the kafka which will render the
> values rather than KafkaExtractor implementation rendering the same?
>
>
>
>
>
> On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <
> jack_kidwel...@comcast.com> wrote:
>
> Hi,
>
>
>
> Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java
> in order to see both kafka record keys and values.
>
>
>
> How does one configure a job to achieve it?
>
>
>
>
>
>
>
>
>

Reply via email to