??????????????????from_pandas????????????????????????????????????????
????????????????????????8??????400??????????????????env.set_parallelism(8)??400????????????????12????????????????????????
1.????????????output??????????????8????????????????????1????????????????????????????????????????????????????????????????????????????????????????????????????1??????????
2.arrow.batch.size????????????????????????????????????????arrow.batch.size????????????????????????
3.??????????????????????????????????????arrow.batch.size????????????????????????????????????????????
4.??????????12??24??????CPU????????????????????????????????????????12????
??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??10??26??(??????) ????8:47
??????: "user-zh"<[email protected]>;
????: Re: pyflink ????????????????????????????????
Hi??
1.
from_pandas????????????????????????????????????????????flink??csv??connector??????????????????
2.
arrow.batch.size????????????????????????????????pandas.series??????????????udf??????????????
Best,
Xingbo
???????? <[email protected]> ??2020??10??26?????? ????4:32??????
> ??????????????????
> ??????????????????
> ??????????
>
??????????????????????????????????????????????13782492????????????????????????????????????4000??????????????????????id????0??????id??
> ????????????
>
??????????????????????????????????????????????????????????????id??????????????????????100m??????????????????????????100m????????-1??
> ??????????????????
> import&nbsp;pandas as&nbsp;pd
> import&nbsp;numpy as&nbsp;np
> from&nbsp;pyflink.datastream import&nbsp;StreamExecutionEnvironment
> from&nbsp;pyflink.table import&nbsp;StreamTableEnvironment,
DataTypes
> from&nbsp;pyflink.table.descriptors import&nbsp;Schema, OldCsv,
FileSystem
> from&nbsp;pyflink.table.udf import&nbsp;udf
> import&nbsp;os
> import&nbsp;time
> # ??????????????????????????????1??batchsize??10??????????????????????????
>
> env =&nbsp;StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env =&nbsp;StreamTableEnvironment.create(env)
>
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '80m')
>
t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
> '100000')
> # ??????????
> if&nbsp;os.path.exists('output'):
> &nbsp;&nbsp;&nbsp; os.remove('output')
>
> t_env.connect(FileSystem().path('output')) \
> &nbsp;&nbsp;&nbsp; .with_format(OldCsv()
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field('id', DataTypes.BIGINT())) \
> &nbsp;&nbsp;&nbsp; .with_schema(Schema()
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field('id', DataTypes.BIGINT())) \
> &nbsp;&nbsp;&nbsp; .create_temporary_table('mySink')
> # ????????????????????
> data =&nbsp;pd.read_csv(r'D:\??????\????????\data\trip\graph_data.csv')
> coor_o =&nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
> data['O_Y'])))).T
> coor_d =&nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
> data['D_Y'])))).T
> coor =&nbsp;coor_o.append(coor_d).drop_duplicates()
> coor.columns =&nbsp;['lng', 'lat']
> coor =&nbsp;coor.sort_index()
> coor =&nbsp;coor.to_numpy()
> # udf??????????
>
>
>
> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
> &nbsp;&nbsp;&nbsp;&nbsp;
DataTypes.ARRAY(DataTypes.FLOAT()),
> DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
> def&nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
> &nbsp;&nbsp;&nbsp; temp
=&nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>
+np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
> &nbsp;&nbsp;&nbsp; distance
=&nbsp;2*np.arctan2(np.sqrt(temp),
> np.sqrt(1-temp))
> &nbsp;&nbsp;&nbsp; distance =&nbsp;distance*3958.8*1609.344
>
> &nbsp;&nbsp;&nbsp; buffer=100
> &nbsp;&nbsp;&nbsp; if&nbsp;(distance
<=&nbsp;buffer).sum() &gt;&nbsp;0:
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
return&nbsp;distance.argmin()
> &nbsp;&nbsp;&nbsp; else:
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
return&nbsp;-1
> # ????????????????
>
> df =&nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
> use_data =&nbsp;df[['pickup_longitude', 'pickup_latitude']]
> # ????????
> t_env.from_pandas(use_data) \
> &nbsp;&nbsp;&nbsp;&nbsp;
.select("distance_meters(pickup_longitude,
> pickup_latitude)") \
> &nbsp;&nbsp;&nbsp;&nbsp; .insert_into('mySink')
> # ??????????
>
> start_time =&nbsp;time.time()
> t_env.execute("tutorial_job")
> print(time.time() -&nbsp;start_time)
> ????????CPU??12??24????
>
>
>
>
>
>
>
>
>
>
>
??????????????????????????????????2607????43????????????????????????????????????????????????????????????????1??????????10????????????????????????..
>
????????????????????????8????????????????????8??????????????????????????????????????????????????????????????????????
> ??????????????????????????????????????????????batch.size??
> ????????????????????