Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink github. 

 

I am getting following error . 

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:

                status = StatusCode.CANCELLED

                details = "Multiplexer hanging up"

                debug_error_string =
"{"created":"@1651051695.104000000","description":"Error received from peer
ipv6:[::1]:64839","file":"src/core/lib/surface/call.cc","file_line":904,"grp
c_message":"Multiplexer hanging up","grpc_status":1}"

py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy

Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
: Could not forward element to next operator

Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
: Could not forward element to next operator

Caused by: java.lang.ClassCastException: [B cannot be cast to
org.apache.flink.types.Row

 

Below is my code for reference..

 

import json

import logging

import os

import sys

 

from pyflink.common import Types, JsonRowSerializationSchema, Row,
CsvRowSerializationSchema

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.datastream.connectors import FlinkKafkaProducer

import math

 

 

def show(ds, env):

    ds.print()

    env.execute()

 

 

def basic_operations():

    env = StreamExecutionEnvironment.get_execution_environment()

    env.set_parallelism(1)

    kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

                             'flink-sql-connector-kafka_2.11-1.14.4.jar')

    env.add_jars("file:///{}".format(kafka_jar))

 

    ds = env.from_collection(

        collection=[

            ('user1', 1, 2000), ('user2', 2, 4000), ('user3', 3, 1000),
('user1', 4, 25000), ('user2', 5, 7000),

            ('user3', 8, 7000),

            ('user1', 12, 2000), ('user2', 13, 4000), ('user3', 15, 1000),
('user1', 17, 20000), ('user2', 18, 40000),

            ('user3', 20, 10000), ('user1', 21, 2000), ('user2', 22, 4000),
('user3', 33, 1000), ('user1', 34, 25000),

            ('user2', 35, 7000), ('user3', 38, 7000)

        ],

        type_info=Types.ROW_NAMED(["id", "info", "value"], [Types.STRING(),
Types.INT(), Types.INT()])

    )

    ds1 = ds.map(lambda x: x)

    ds1.print()

 

    def update_tel(data):

        # parse the json

        test_data = data.info

        test_data += data.value

        res = Row('x', 'y')

        #return Types.ROW(data.id, test_data)

        return res(data.id, test_data)

 

    # show(ds.map(update_tel).key_by(lambda data: data[0]), env)

    ds = ds.map(update_tel)

    ds.print()

    # ds = ds.map(lambda x: type(x))

    # ds.print()

    # ds = ds.map(lambda x: Row([x]), output_type=Types.ROW([Types.STRING(),
Types.INT()]))

    # ds.print()

 

    type_info = Types.ROW_NAMED(['x', 'y'], [Types.STRING(), Types.INT()])

    serialization_schema =
CsvRowSerializationSchema.Builder(type_info).build()

    kafka_producer = FlinkKafkaProducer(

        topic='testing',

        serialization_schema=serialization_schema,

        producer_config={'bootstrap.servers': 'localhost:9093', 'group.id':
'test_group'}

    )

 

    ds.add_sink(kafka_producer)

 

    env.execute('basic_operations')

 

if __name__ == '__main__':

    logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")

 

    basic_operations()

 

 

Reply via email to