Hi,

I would like to < unit test > a job flink with Kafka as source (and Sink). I am 
trying to use the library scalatest-embedded-kafka to simulate a Kafka for my 
test.

For example, I would like to get data (string stream) from Kafka, convert it 
intro uppercase and put it into another topic.

Now, I am just trying to use Flink's kafka consumer to read into a topic (with 
embedded kafka).

Here is the code for example :

```scala

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.scalatest.{Matchers, WordSpec}
import scala.util.Random

object SimpleFlinkKafkaTest {
  SimpleFlinkKafkaTest
  val kafkaPort = 9092
  val zooKeeperPort = 2181

  val groupId = Random.nextInt(1000000).toString
  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("zookeeper.connect", "localhost:2181")
  props.put("auto.offset.reset", "earliest")
  props.put("group.id", groupId)
  props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")

  val propsMap = Map(
   "bootstrap.servers" -> "localhost:9092",
    "zookeeper.connect" -> "localhost:2181",
    "auto.offset.reset" -> "earliest",
    "group.id" -> groupId,
    "key.deserializer" -> 
"org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer" -> 
"org.apache.kafka.common.serialization.StringDeserializer"
  )

  val inputString = "mystring"
  val expectedString = "MYSTRING"

}

class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {

  "runs with embedded kafka" should {

    "work" in {

      implicit val config = EmbeddedKafkaConfig(
        kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
        zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort,
        customConsumerProperties = SimpleFlinkKafkaTest.propsMap
      )

      withRunningKafka {

        publishStringMessageToKafka("input-topic", 
SimpleFlinkKafkaTest.inputString)

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)

        val kafkaConsumer = new FlinkKafkaConsumer011(
          "input-topic",
          new SimpleStringSchema,
          SimpleFlinkKafkaTest.props
        )

        val inputStream = env.addSource(kafkaConsumer)

        val outputStream = inputStream.map { msg =>
          msg.toUpperCase
        }

        outputStream.writeAsText("test.csv", WriteMode.OVERWRITE)

        env.execute()

        consumeFirstStringMessageFrom("output-topic") shouldEqual 
SimpleFlinkKafkaTest.expectedString

      }
    }
  }
}
```

The flink process si running but nothing happen. I try ot write into a text 
file to see any output but there is nothing into the file.

Any idea ? Does anybody use this library to test a Flink Job using Kafka ?

Thanks in advance,

Thomas

Reply via email to