This issue is covered in your other ML thread "/java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign/".

Let's move further discussions there so we don't have 2 threads in parallel for the same problem.

On 03.07.2018 09:21, Mich Talebzadeh wrote:
thanks Hequn and Jorn that helped.

But I am still getting this error for a simple streaming program at execution!

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem

object md_streaming
{
  def main(args: Array[String])
  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
properties.setProperty("bootstrap.servers", "rhes75:9092")
    //properties.setProperty("zookeeper.connect", "rhes75:2181")
    properties.setProperty("group.id <http://group.id>", "md_streaming")
    properties.setProperty("auto.offset.reset", "latest")
    val stream = env
.addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("/tmp/md_streaming.txt", FileSystem.WriteMode.OVERWRITE)
    env.execute("Flink Kafka Example")
}
}


Completed compiling
Starting execution of program
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at md_streaming$.main(md_streaming.scala:32)
        at md_streaming.main(md_streaming.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)



Dr Mich Talebzadeh

LinkedIn /https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.



On Tue, 3 Jul 2018 at 07:20, Jörn Franke <jornfra...@gmail.com <mailto:jornfra...@gmail.com>> wrote:

    import org.apache.flink.core.fs.FileSystem


    On 3. Jul 2018, at 08:13, Mich Talebzadeh
    <mich.talebza...@gmail.com <mailto:mich.talebza...@gmail.com>> wrote:

    thanks Hequn.

    When I use as suggested, I am getting this error

    error]
    
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:30:
    not found: value FileSystem
    [error] .writeAsText("/tmp/md_streaming.txt",
    FileSystem.WriteMode.OVERWRITE)
    [error]                                                        ^
    [error] one error found

    Dr Mich Talebzadeh

    LinkedIn
    
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

    http://talebzadehmich.wordpress.com


    *Disclaimer:* Use it at your own risk.Any and all responsibility
    for any loss, damage or destruction of data or any other property
    which may arise from relying on this email's technical content is
    explicitly disclaimed. The author will in no case be liable for
    any monetary damages arising from such loss, damage or destruction.



    On Tue, 3 Jul 2018 at 03:16, Hequn Cheng <chenghe...@gmail.com
    <mailto:chenghe...@gmail.com>> wrote:

        Hi Mich,

        It seems the writeMode has not been set correctly. Have you
        ever tried

            .writeAsText("/tmp/md_streaming.txt",
            FileSystem.WriteMode.OVERWRITE);


        On Mon, Jul 2, 2018 at 10:44 PM, Mich Talebzadeh
        <mich.talebza...@gmail.com
        <mailto:mich.talebza...@gmail.com>> wrote:

            Flink 1.5

            This streaming data written to a file

               val stream = env
                             .addSource(new
            FlinkKafkaConsumer09[String]("md", new
            SimpleStringSchema(), properties))
            .writeAsText("/tmp/md_streaming.txt")
             env.execute("Flink Kafka Example")

            The error states

            Caused by: java.io.IOException: File or directory
            /tmp/md_streaming.txt already exists. Existing files and
            directories are not overwritten in NO_OVERWRITE mode. Use
            OVERWRITE mode to overwrite existing files and directories.

            Is there any append in writeAsText? I tried OVERWRITE
            but  did not work.

            Thanks

            Dr Mich Talebzadeh

            LinkedIn
            
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

            http://talebzadehmich.wordpress.com


            *Disclaimer:* Use it at your own risk.Any and all
            responsibility for any loss, damage or destruction of
            data or any other property which may arise from relying
            on this email's technical content is explicitly
            disclaimed. The author will in no case be liable for any
            monetary damages arising from such loss, damage or
            destruction.



Reply via email to