Hi again Avi, In the first example that you posted (the one with the Kafka source), do you call env.execute()?
Cheers, Kostas On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi Avi, > > In the last snippet that you posted, you have not activated checkpoints. > > Checkpoints are needed for the StreamingFileSink to produce results, > especially in the case of BulkWriters (like Parquet) where > the part file is rolled upon reception of a checkpoint and the part is > finalised (i.e. "committed") when the checkpoint gets completed > successfully. > > Could you please enable checkpointing and make sure that the job runs long > enough for at least some checkpoints to be completed? > > Thanks a lot, > Kostas > > On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com> wrote: > >> Checkout this little App. you can see that the file is created but no >> data is written. even for a single record >> >> import io.eels.component.parquet.ParquetWriterConfig >> import org.apache.avro.Schema >> import org.apache.avro.generic.{ GenericData, GenericRecord } >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >> import org.apache.hadoop.fs.Path >> import org.apache.parquet.avro.AvroParquetWriter >> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter } >> import org.apache.parquet.hadoop.metadata.CompressionCodecName >> import scala.io.Source >> import org.apache.flink.streaming.api.scala._ >> >> object Tester extends App { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> def now = System.currentTimeMillis() >> val path = new Path(s"test-$now.parquet") >> val schemaString = >> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString >> val schema: Schema = new Schema.Parser().parse(schemaString) >> val compressionCodecName = CompressionCodecName.SNAPPY >> val config = ParquetWriterConfig() >> val genericReocrd: GenericRecord = new GenericData.Record(schema) >> genericReocrd.put("name", "test_b") >> genericReocrd.put("code", "NoError") >> genericReocrd.put("ts", 100L) >> val stream = env.fromElements(genericReocrd) >> val writer: ParquetWriter[GenericRecord] = >> AvroParquetWriter.builder[GenericRecord](path) >> .withSchema(schema) >> .withCompressionCodec(compressionCodecName) >> .withPageSize(config.pageSize) >> .withRowGroupSize(config.blockSize) >> .withDictionaryEncoding(config.enableDictionary) >> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >> .withValidation(config.validating) >> .build() >> >> writer.write(genericReocrd) >> stream.addSink { r => >> println(s"In Sink $r") >> writer.write(r) >> } >> env.execute() >> // writer.close() >> } >> >> >> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> wrote: >> >>> Can you try closing the writer? >>> >>> AvroParquetWriter has an internal buffer. Try doing a .close() in >>> snapshot()( since you are checkpointing hence this method will be called) >>> >>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com> >>> wrote: >>> >>>> Thanks Rafi, >>>> I am actually not using assignTimestampsAndWatermarks , I will try to >>>> add it as you suggested. however it seems that the messages I repeating in >>>> the stream over and over even if I am pushing single message manually to >>>> the queue, that message will repeat infinity >>>> >>>> Cheers >>>> Avi >>>> >>>> >>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> >>>> wrote: >>>> >>>>> Hi Avi, >>>>> >>>>> I can't see the part where you use assignTimestampsAndWatermarks. >>>>> If this part in not set properly, it's possible that watermarks are >>>>> not sent and nothing will be written to your Sink. >>>>> >>>>> See here for more details: >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>>>> >>>>> Hope this helps, >>>>> Rafi >>>>> >>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am trying to implement Parquet Writer as SinkFunction. The pipeline >>>>>> consists of kafka as source and parquet file as a sink however it seems >>>>>> like the stream is repeating itself like endless loop and the parquet >>>>>> file >>>>>> is not written . can someone please help me with this? >>>>>> >>>>>> object ParquetSinkWriter{ >>>>>> private val path = new Path("tmp/pfile") >>>>>> private val schemaString = >>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>>>> private val avroSchema: Schema = new >>>>>> Schema.Parser().parse(schemaString) >>>>>> private val compressionCodecName = CompressionCodecName.SNAPPY >>>>>> private val config = ParquetWriterConfig() >>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>> .withSchema(avroSchema) >>>>>> .withCompressionCodec(compressionCodecName) >>>>>> .withPageSize(config.pageSize) >>>>>> .withRowGroupSize(config.blockSize) >>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>> .withValidation(config.validating) >>>>>> .build() >>>>>> } >>>>>> >>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends >>>>>> SinkFunction[GenericRecord] { >>>>>> import ParquetSinkWriter._ >>>>>> override def invoke(value: GenericRecord): Unit = { >>>>>> println(s"ADDING TO File : $value") // getting this output >>>>>> writer.write(value) //the output is not written to the file >>>>>> } >>>>>> } >>>>>> >>>>>> //main app >>>>>> object StreamingJob extends App { >>>>>> implicit val env: StreamExecutionEnvironment = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>> env.enableCheckpointing(500) >>>>>> >>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>>>> >>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>>>> Time.seconds(3), Time.seconds(3))) >>>>>> val backend: StateBackend = new >>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>>>> env.setStateBackend(backend) >>>>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>>>> *val stream2: DataStream[DnsRequest] = env.addSource(//consume >>>>>> from kafka)* >>>>>> *stream2.map { r =>* >>>>>> * println(s"MAPPING $r") //this output keeps repeating in a loop* >>>>>> * val genericReocrd: GenericRecord = new >>>>>> GenericData.Record(schema)* >>>>>> * genericReocrd.put("qname", r.qname)* >>>>>> * genericReocrd.put("rcode", r.rcode)* >>>>>> * genericReocrd.put("ts", r.ts)* >>>>>> * genericReocrd* >>>>>> * }.addSink(writer) * >>>>>> >>>>>> Thanks for your help >>>>>> Avi >>>>>> >>>>>> >>> >>> -- >>> Thanks, >>> Vipul >>> >>