??????????????????
??????????????????
??????????
??????????????????????????????????????????????13782492????????????????????????????????????4000??????????????????????id????0??????id??
????????????
??????????????????????????????????????????????????????????????id??????????????????????100m??????????????????????????100m????????-1??
??????????????????
import pandas as pd
import numpy as np
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
import os
import time
# ??????????????????????????????1??batchsize??10??????????????????????????

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
 '80m')
t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
 '100000')
# ??????????
if os.path.exists('output'):
    os.remove('output')

t_env.connect(FileSystem().path('output')) \
    .with_format(OldCsv()
                
 .field('id', DataTypes.BIGINT())) \
    .with_schema(Schema()
                
 .field('id', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')
# ????????????????????
data = pd.read_csv(r'D:\??????\????????\data\trip\graph_data.csv')
coor_o = pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'], 
data['O_Y'])))).T
coor_d = pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'], 
data['D_Y'])))).T
coor = coor_o.append(coor_d).drop_duplicates()
coor.columns = ['lng', 'lat']
coor = coor.sort_index()
coor = coor.to_numpy()
# udf??????????



@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
     DataTypes.ARRAY(DataTypes.FLOAT()), 
DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
def distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
    temp = (np.sin((lng2-lng1)/2*np.pi/180)**2+ 
        
+np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
    distance = 2*np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    distance = distance*3958.8*1609.344

    buffer=100
&nbsp;&nbsp;&nbsp; if&nbsp;(distance <=&nbsp;buffer).sum() &gt;&nbsp;0:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;distance.argmin()
&nbsp;&nbsp;&nbsp; else:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;-1
# ????????????????

df =&nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
use_data =&nbsp;df[['pickup_longitude', 'pickup_latitude']]
# ????????
t_env.from_pandas(use_data) \
&nbsp;&nbsp;&nbsp;&nbsp; .select("distance_meters(pickup_longitude, 
pickup_latitude)") \
&nbsp;&nbsp;&nbsp;&nbsp; .insert_into('mySink')
# ??????????

start_time =&nbsp;time.time()
t_env.execute("tutorial_job")
print(time.time() -&nbsp;start_time)
????????CPU??12??24????










??????????????????????????????????2607????43????????????????????????????????????????????????????????????????1??????????10????????????????????????..
 
????????????????????????8????????????????????8??????????????????????????????????????????????????????????????????????
??????????????????????????????????????????????batch.size??
????????????????????

回复