Hi:

Hope you are good! I have a question for pyflink, details as below:

Feature: Windows of size 10 minutes that slides by 5 minutes for data 
aggregate, then do something, almost 2GB data per window, 1 million data items.

Job params:

bin/yarn-session.sh -s 2 -jm 2048 -tm 48768 \
-Dyarn.containers.vcores=4 \
-Dtaskmanager.memory.managed.consumer-weights=DATAPROC:30,PYTHON:70 \
-Dtaskmanager.memory.managed.fraction=0.7 \
-Dtaskmanager.memory.task.off-heap.size=5120m \
-nm $task_name -qu $queue -d


Exception msg as below:

Traceback (most recent call last):
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 253, in _execute
    response = task()
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 310, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 480, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 515, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 978, in process_bundle
    element.data)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 218, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 330, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 332, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 627, in decode_from_stream
    yield self._decode_one_batch_from_stream(in_stream, 
in_stream.read_var_int64())
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 638, in _decode_one_batch_from_stream
    return arrow_to_pandas(self._timezone, self._field_types, 
[next(self._batch_reader)])
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 631, in _load_from_stream
    reader = pa.ipc.open_stream(stream)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py",
 line 137, in open_stream
    return RecordBatchStreamReader(source)
  File 
"/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py",
 line 61, in __init__
    self._open(source)
  File "pyarrow/ipc.pxi", line 352, in 
pyarrow.lib._RecordBatchStreamReader._open
  File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
OSError: Expected IPC message of type schema but got record batch


Many Thanks!




 

回复