Dear Team,
I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink github.
I am getting following error .
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string =
"{"created":"@1651051695.104000000","description":"Error received from peer
ipv6:[::1]:64839","file":"src/core/lib/surface/call.cc","file_line":904,"grp
c_message":"Multiplexer hanging up","grpc_status":1}"
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
: Could not forward element to next operator
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
: Could not forward element to next operator
Caused by: java.lang.ClassCastException: [B cannot be cast to
org.apache.flink.types.Row
Below is my code for reference..
import json
import logging
import os
import sys
from pyflink.common import Types, JsonRowSerializationSchema, Row,
CsvRowSerializationSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaProducer
import math
def show(ds, env):
ds.print()
env.execute()
def basic_operations():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-sql-connector-kafka_2.11-1.14.4.jar')
env.add_jars("file:///{}".format(kafka_jar))
ds = env.from_collection(
collection=[
('user1', 1, 2000), ('user2', 2, 4000), ('user3', 3, 1000),
('user1', 4, 25000), ('user2', 5, 7000),
('user3', 8, 7000),
('user1', 12, 2000), ('user2', 13, 4000), ('user3', 15, 1000),
('user1', 17, 20000), ('user2', 18, 40000),
('user3', 20, 10000), ('user1', 21, 2000), ('user2', 22, 4000),
('user3', 33, 1000), ('user1', 34, 25000),
('user2', 35, 7000), ('user3', 38, 7000)
],
type_info=Types.ROW_NAMED(["id", "info", "value"], [Types.STRING(),
Types.INT(), Types.INT()])
)
ds1 = ds.map(lambda x: x)
ds1.print()
def update_tel(data):
# parse the json
test_data = data.info
test_data += data.value
res = Row('x', 'y')
#return Types.ROW(data.id, test_data)
return res(data.id, test_data)
# show(ds.map(update_tel).key_by(lambda data: data[0]), env)
ds = ds.map(update_tel)
ds.print()
# ds = ds.map(lambda x: type(x))
# ds.print()
# ds = ds.map(lambda x: Row([x]), output_type=Types.ROW([Types.STRING(),
Types.INT()]))
# ds.print()
type_info = Types.ROW_NAMED(['x', 'y'], [Types.STRING(), Types.INT()])
serialization_schema =
CsvRowSerializationSchema.Builder(type_info).build()
kafka_producer = FlinkKafkaProducer(
topic='testing',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9093', 'group.id':
'test_group'}
)
ds.add_sink(kafka_producer)
env.execute('basic_operations')
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
basic_operations()