Hi Kyle, I'm not sure I understand the problem. I assume you have one sink for each Avro type (Kafka topic). If you have multiple sinks, why is it not possible to configure each one with the correct Avro schema?
Best, Fabian 2018-01-05 22:11 GMT+01:00 Kyle Hamlin <[email protected]>: > I implemented an Avro to Parquet writer which previously took an Avro > schema in as a string to the constructor and passed it to the > AvroParquetWriter. Now I'm wondering if there is a way to get the schema > from the element and pass to the AvroParquetWriter. I tried grabbing the > schema from the element in the write method but it is called later than > open so that doesn't seem to work. I need to do this because I'm sinking > several Kafka topics in one app to s3 so different messages need different > schema passed to the writer. > > class ParquetSinkWriter[T <: GenericRecord]() extends Writer[T] { > > @transient private var writer: ParquetWriter[T] = _ > @transient private var schema: Schema = _ > > override def write(element: T): Unit = { > schema = element.getSchema > writer.write(element) > } > > override def duplicate(): ParquetSinkWriter[T] = new ParquetSinkWriter[T]() > > override def close(): Unit = writer.close() > > override def getPos: Long = writer.getDataSize > > override def flush(): Long = writer.getDataSize > > override def open(fs: FileSystem, path: Path): Unit = { > > writer = AvroParquetWriter.builder[T](path) > .withSchema(schema) > .withCompressionCodec(CompressionCodecName.SNAPPY) > .build() > } > > } >
