Hi,

1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。
2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你

Best,
Xingbo

洗你的头 <[email protected]> 于2020年10月26日周一 下午4:32写道:

> 尊敬的开发者您好,
> 我的需求是这样的,
> 拥有数据:
> 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id)
> 需要做什么?
> 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。
> 我现在的代码如下:
> import&nbsp;pandas as&nbsp;pd
> import&nbsp;numpy as&nbsp;np
> from&nbsp;pyflink.datastream import&nbsp;StreamExecutionEnvironment
> from&nbsp;pyflink.table import&nbsp;StreamTableEnvironment, DataTypes
> from&nbsp;pyflink.table.descriptors import&nbsp;Schema, OldCsv, FileSystem
> from&nbsp;pyflink.table.udf import&nbsp;udf
> import&nbsp;os
> import&nbsp;time
> # 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用)
>
> env =&nbsp;StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env =&nbsp;StreamTableEnvironment.create(env)
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '80m')
> t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
> '100000')
> # 输出表创建
> if&nbsp;os.path.exists('output'):
> &nbsp;&nbsp;&nbsp; os.remove('output')
>
> t_env.connect(FileSystem().path('output')) \
> &nbsp;&nbsp;&nbsp; .with_format(OldCsv()
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field('id', DataTypes.BIGINT())) \
> &nbsp;&nbsp;&nbsp; .with_schema(Schema()
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field('id', DataTypes.BIGINT())) \
> &nbsp;&nbsp;&nbsp; .create_temporary_table('mySink')
> # 交叉口经纬度数据读取
> data =&nbsp;pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv')
> coor_o =&nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
> data['O_Y'])))).T
> coor_d =&nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
> data['D_Y'])))).T
> coor =&nbsp;coor_o.append(coor_d).drop_duplicates()
> coor.columns =&nbsp;['lng', 'lat']
> coor =&nbsp;coor.sort_index()
> coor =&nbsp;coor.to_numpy()
> # udf编写与注册
>
>
>
> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
> &nbsp;&nbsp;&nbsp;&nbsp; DataTypes.ARRAY(DataTypes.FLOAT()),
> DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
> def&nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
> &nbsp;&nbsp;&nbsp; temp =&nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
> &nbsp;&nbsp;&nbsp; distance =&nbsp;2*np.arctan2(np.sqrt(temp),
> np.sqrt(1-temp))
> &nbsp;&nbsp;&nbsp; distance =&nbsp;distance*3958.8*1609.344
>
> &nbsp;&nbsp;&nbsp; buffer=100
> &nbsp;&nbsp;&nbsp; if&nbsp;(distance <=&nbsp;buffer).sum() &gt;&nbsp;0:
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;distance.argmin()
> &nbsp;&nbsp;&nbsp; else:
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;-1
> # 出行起点数据读取
>
> df =&nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
> use_data =&nbsp;df[['pickup_longitude', 'pickup_latitude']]
> # 处理流程
> t_env.from_pandas(use_data) \
> &nbsp;&nbsp;&nbsp;&nbsp; .select("distance_meters(pickup_longitude,
> pickup_latitude)") \
> &nbsp;&nbsp;&nbsp;&nbsp; .insert_into('mySink')
> # 执行与计时
>
> start_time =&nbsp;time.time()
> t_env.execute("tutorial_job")
> print(time.time() -&nbsp;start_time)
> 我电脑的CPU为12核24线程
>
>
>
>
>
>
>
>
>
>
> 目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧..
> 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢)
> 请问,我这种情况应该如何去提速呢?可否解释一下batch.size?
> 期待您的回答,感谢!

回复