??????????????????
??????????????????
??????????
??????????????????????????????????????????????13782492????????????????????????????????????4000??????????????????????id????0??????id??
????????????
??????????????????????????????????????????????????????????????id??????????????????????100m??????????????????????????100m????????-1??
??????????????????
import pandas as pd
import numpy as np
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
import os
import time
# ??????????????????????????????1??batchsize??10??????????????????????????
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')
t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
'100000')
# ??????????
if os.path.exists('output'):
os.remove('output')
t_env.connect(FileSystem().path('output')) \
.with_format(OldCsv()
.field('id', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('id', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
# ????????????????????
data = pd.read_csv(r'D:\??????\????????\data\trip\graph_data.csv')
coor_o = pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
data['O_Y'])))).T
coor_d = pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
data['D_Y'])))).T
coor = coor_o.append(coor_d).drop_duplicates()
coor.columns = ['lng', 'lat']
coor = coor.sort_index()
coor = coor.to_numpy()
# udf??????????
@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
DataTypes.ARRAY(DataTypes.FLOAT()),
DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
def distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
temp = (np.sin((lng2-lng1)/2*np.pi/180)**2+
+np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
distance = 2*np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
distance = distance*3958.8*1609.344
buffer=100
if (distance <= buffer).sum() > 0:
return distance.argmin()
else:
return -1
# ????????????????
df = pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
use_data = df[['pickup_longitude', 'pickup_latitude']]
# ????????
t_env.from_pandas(use_data) \
.select("distance_meters(pickup_longitude,
pickup_latitude)") \
.insert_into('mySink')
# ??????????
start_time = time.time()
t_env.execute("tutorial_job")
print(time.time() - start_time)
????????CPU??12??24????
??????????????????????????????????2607????43????????????????????????????????????????????????????????????????1??????????10????????????????????????..
????????????????????????8????????????????????8??????????????????????????????????????????????????????????????????????
??????????????????????????????????????????????batch.size??
????????????????????