Hi, 1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。 2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你
Best, Xingbo 洗你的头 <[email protected]> 于2020年10月26日周一 下午4:32写道: > 尊敬的开发者您好, > 我的需求是这样的, > 拥有数据: > 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id) > 需要做什么? > 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。 > 我现在的代码如下: > import pandas as pd > import numpy as np > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, DataTypes > from pyflink.table.descriptors import Schema, OldCsv, FileSystem > from pyflink.table.udf import udf > import os > import time > # 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用) > > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > t_env = StreamTableEnvironment.create(env) > t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > '80m') > t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size", > '100000') > # 输出表创建 > if os.path.exists('output'): > os.remove('output') > > t_env.connect(FileSystem().path('output')) \ > .with_format(OldCsv() > > .field('id', DataTypes.BIGINT())) \ > .with_schema(Schema() > > .field('id', DataTypes.BIGINT())) \ > .create_temporary_table('mySink') > # 交叉口经纬度数据读取 > data = pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv') > coor_o = pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'], > data['O_Y'])))).T > coor_d = pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'], > data['D_Y'])))).T > coor = coor_o.append(coor_d).drop_duplicates() > coor.columns = ['lng', 'lat'] > coor = coor.sort_index() > coor = coor.to_numpy() > # udf编写与注册 > > > > @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(), > DataTypes.ARRAY(DataTypes.FLOAT()), > DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT()) > def distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]): > temp = (np.sin((lng2-lng1)/2*np.pi/180)**2+ > > +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2) > distance = 2*np.arctan2(np.sqrt(temp), > np.sqrt(1-temp)) > distance = distance*3958.8*1609.344 > > buffer=100 > if (distance <= buffer).sum() > 0: > return distance.argmin() > else: > return -1 > # 出行起点数据读取 > > df = pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv') > use_data = df[['pickup_longitude', 'pickup_latitude']] > # 处理流程 > t_env.from_pandas(use_data) \ > .select("distance_meters(pickup_longitude, > pickup_latitude)") \ > .insert_into('mySink') > # 执行与计时 > > start_time = time.time() > t_env.execute("tutorial_job") > print(time.time() - start_time) > 我电脑的CPU为12核24线程 > > > > > > > > > > > 目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧.. > 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢) > 请问,我这种情况应该如何去提速呢?可否解释一下batch.size? > 期待您的回答,感谢!
