退订
退订 jimandlice jimandl...@163.com
退订
退订 | | jimandlice | | 邮箱:jimandl...@163.com |
退订
退订 | | jimandlice | | 邮箱:jimandl...@163.com |
退订
退订 | | jimandlice | | 邮箱:jimandl...@163.com |
退订
退订 | | jimandlice | | 邮箱:jimandl...@163.com |
回复:flink如何正则读取hdfs下的文件
好的 谢谢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月21日 19:42,Jingsong Li 写道: 1.11还没发布,文档还在编写中 Best, Jingsong Lee On Thu, May 21, 2020 at 7:33 PM jimandlice wrote: > 1.11的话 能提供一个demo么 > > > > > | | > jimandlice > | > | > 邮箱:jimandl...@163.com > | > > Signature is customized by Netease Mail Master > > 在2020年05月21日 19:31,Jingsong Li 写道: > > 写入之后 还需要用脚本倒数据入hive么 > - 用Datastream来写,需要 > - 1.11的Table层来写,配置下就自动add partition到hive metastore了 > > Best, > Jingsong Lee > > On Thu, May 21, 2020 at 7:11 PM jimandlice wrote: > > > 写入之后 还需要用脚本倒数据入hive么 > > > > > > > > > > | | > > jimandlice > > | > > | > > 邮箱:jimandl...@163.com > > | > > > > Signature is customized by Netease Mail Master > > > > 在2020年05月21日 15:02,Jingsong Li 写道: > > 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars? > > > > Best, > > Jingsong Lee > > > > On Thu, May 21, 2020 at 2:57 PM 阿华田 wrote: > > > > > public static void main(String[] args) throws Exception { > > > //初始化任务参数 > > > ExecutionEnvironment env = > > ExecutionEnvironment.getExecutionEnvironment(); > > > Job job = Job.getInstance(); > > > //自定义input读取hdfs > > > HadoopInputFormat hadoopIF = new > > > HadoopInputFormat( > > > new TextInputFormat(), LongWritable.class, Text.class, job); > > > //过滤需要读取的子目录 > > > ArrayList inputhPaths = > > > > > > HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20"); > > > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new > > > Path[inputhPaths.size()])); > > > //自定义input的方式读取hdfs > > > DataSet> source = env.createInput(hadoopIF); > > > source.output(new HdfsTrainSinktest()); > > > env.execute("offline_train"); > > > } > > > 通过这种方式本地可以读取,提交到yarn上会报如下错误 > > > > > > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: > Cannot > > > initialize task 'DataSource (at > > createInput(ExecutionEnvironment.java:552) > > > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': > Loading > > > the input/output formats failed: > > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > > at > > > > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) > > > at > > > > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:176) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > > > at > > > > > > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > > > at > > > org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:265) > > > at > > > > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > > > at > > > > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > > > at > > > > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:146) > > > ... 10 more > > > Caused by: java.lang.Exception: Loading the input/output formats > failed: > > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) > > > at > > > > > > org.apache.flink.r
回复:flink如何正则读取hdfs下的文件
1.11的话 能提供一个demo么 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月21日 19:31,Jingsong Li 写道: > 写入之后 还需要用脚本倒数据入hive么 - 用Datastream来写,需要 - 1.11的Table层来写,配置下就自动add partition到hive metastore了 Best, Jingsong Lee On Thu, May 21, 2020 at 7:11 PM jimandlice wrote: > 写入之后 还需要用脚本倒数据入hive么 > > > > > | | > jimandlice > | > | > 邮箱:jimandl...@163.com > | > > Signature is customized by Netease Mail Master > > 在2020年05月21日 15:02,Jingsong Li 写道: > 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars? > > Best, > Jingsong Lee > > On Thu, May 21, 2020 at 2:57 PM 阿华田 wrote: > > > public static void main(String[] args) throws Exception { > > //初始化任务参数 > > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > Job job = Job.getInstance(); > > //自定义input读取hdfs > > HadoopInputFormat hadoopIF = new > > HadoopInputFormat( > > new TextInputFormat(), LongWritable.class, Text.class, job); > > //过滤需要读取的子目录 > > ArrayList inputhPaths = > > > HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20"); > > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new > > Path[inputhPaths.size()])); > > //自定义input的方式读取hdfs > > DataSet> source = env.createInput(hadoopIF); > > source.output(new HdfsTrainSinktest()); > > env.execute("offline_train"); > > } > > 通过这种方式本地可以读取,提交到yarn上会报如下错误 > > > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > > initialize task 'DataSource (at > createInput(ExecutionEnvironment.java:552) > > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading > > the input/output formats failed: > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > > at > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > > at > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > > at > > > org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:176) > > at > > > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > > at > > > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > > at > > org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:265) > > at > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > > at > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > > at > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:146) > > ... 10 more > > Caused by: java.lang.Exception: Loading the input/output formats failed: > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) > > ... 20 more > > Caused by: java.lang.RuntimeException: Deserializing the input/output > > formats failed: unread block data > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.(InputOutputFormatContainer.java:68) > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153) > > ... 22 more > > Caused by: java.lang.IllegalStateException: unread block data > > at > > > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783) > > at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605) > > at > > java.io.ObjectInputStream.defaultR
回复:flink如何正则读取hdfs下的文件
写入之后 还需要用脚本倒数据入hive么 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月21日 15:02,Jingsong Li 写道: 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars? Best, Jingsong Lee On Thu, May 21, 2020 at 2:57 PM 阿华田 wrote: > public static void main(String[] args) throws Exception { > //初始化任务参数 > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > Job job = Job.getInstance(); > //自定义input读取hdfs > HadoopInputFormat hadoopIF = new > HadoopInputFormat( > new TextInputFormat(), LongWritable.class, Text.class, job); > //过滤需要读取的子目录 > ArrayList inputhPaths = > HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20"); > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new > Path[inputhPaths.size()])); > //自定义input的方式读取hdfs > DataSet> source = env.createInput(hadoopIF); > source.output(new HdfsTrainSinktest()); > env.execute("offline_train"); > } > 通过这种方式本地可以读取,提交到yarn上会报如下错误 > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize task 'DataSource (at createInput(ExecutionEnvironment.java:552) > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading > the input/output formats failed: > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:176) > at > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > at > org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:265) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:146) > ... 10 more > Caused by: java.lang.Exception: Loading the input/output formats failed: > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) > ... 20 more > Caused by: java.lang.RuntimeException: Deserializing the input/output > formats failed: unread block data > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.(InputOutputFormatContainer.java:68) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153) > ... 22 more > Caused by: java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at > org.apache.flink.runtime.operators.util.T
回复:flink如何正则读取hdfs下的文件
flink 写入hive 使用api 思路是怎么的呢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月21日 10:57,阿华田 写道: flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
??????flink ????????join
??datastream | | jimandlice | | ??jimandl...@163.com | Signature is customized by Netease Mail Master ??2020??05??16?? 23:00??1048262223 ?? ??dataset api?? ---- ??: "jimandlice"
回复:flink 历史数据join
使用sql api的方式 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月16日 22:51,jimandlice 写道: 大佬 你好 ! kafka 写入hive 您这边demo么 我这边 只完成了hdfs 应该可以写到hive上去 能提供一个demo么 在 2020-05-15 19:41:59,"zhisheng" 写道: >看看 Flink UI 上 作业 task 的 sent 和 receive >的数据是否还在变更一般可以知道作业是否还在进行,等不动了,则意味着你这两个表固定的数据都已经 join 完了,等 checkpoint 也 >complete 完成了即可以停掉作业。 > >实在不放心,不知道啥时候跑完,可以晚上开始跑,第二天白天再去看看就好了 > >jimandlice 于2020年5月15日周五 下午7:38写道: > >> 是的 我想用datastrem 来做 join停的话 需要注意什么 >> >> >> >> >> | | >> jimandlice >> | >> | >> 邮箱:jimandl...@163.com >> | >> >> Signature is customized by Netease Mail Master >> >> 在2020年05月15日 19:36,zhisheng 写道: >> 所以现在纠结的是使用 DataStream 还是 DataSet ? >> >> 可以使用 DataStream,作业 join 完了停掉作业就行了。 >> >> 小黑 于2020年5月15日周五 下午3:28写道: >> >> > >> > 先工作上有一个需求 2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据 >> > 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道 >> > 是用datatream还是dataset 没有一个很好的 解决方案 望给与回复 >> > >> > >> > >> > >> > >>
Re:Re: flink 历史数据join
大佬 你好 ! kafka 写入hive 您这边demo么 我这边 只完成了hdfs 应该可以写到hive上去 能提供一个demo么 在 2020-05-15 19:41:59,"zhisheng" 写道: >看看 Flink UI 上 作业 task 的 sent 和 receive >的数据是否还在变更一般可以知道作业是否还在进行,等不动了,则意味着你这两个表固定的数据都已经 join 完了,等 checkpoint 也 >complete 完成了即可以停掉作业。 > >实在不放心,不知道啥时候跑完,可以晚上开始跑,第二天白天再去看看就好了 > >jimandlice 于2020年5月15日周五 下午7:38写道: > >> 是的 我想用datastrem 来做 join停的话 需要注意什么 >> >> >> >> >> | | >> jimandlice >> | >> | >> 邮箱:jimandl...@163.com >> | >> >> Signature is customized by Netease Mail Master >> >> 在2020年05月15日 19:36,zhisheng 写道: >> 所以现在纠结的是使用 DataStream 还是 DataSet ? >> >> 可以使用 DataStream,作业 join 完了停掉作业就行了。 >> >> 小黑 于2020年5月15日周五 下午3:28写道: >> >> > >> > 先工作上有一个需求 2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据 >> > 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道 >> > 是用datatream还是dataset 没有一个很好的 解决方案 望给与回复 >> > >> > >> > >> > >> > >>
回复:flink 历史数据join
好的 谢谢哈 我先试一试 有问题在和你说哈 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 19:41,zhisheng 写道: 看看 Flink UI 上 作业 task 的 sent 和 receive 的数据是否还在变更一般可以知道作业是否还在进行,等不动了,则意味着你这两个表固定的数据都已经 join 完了,等 checkpoint 也 complete 完成了即可以停掉作业。 实在不放心,不知道啥时候跑完,可以晚上开始跑,第二天白天再去看看就好了 jimandlice 于2020年5月15日周五 下午7:38写道: > 是的 我想用datastrem 来做 join停的话 需要注意什么 > > > > > | | > jimandlice > | > | > 邮箱:jimandl...@163.com > | > > Signature is customized by Netease Mail Master > > 在2020年05月15日 19:36,zhisheng 写道: > 所以现在纠结的是使用 DataStream 还是 DataSet ? > > 可以使用 DataStream,作业 join 完了停掉作业就行了。 > > 小黑 于2020年5月15日周五 下午3:28写道: > > > > > 先工作上有一个需求 2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据 > > 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道 > > 是用datatream还是dataset 没有一个很好的 解决方案 望给与回复 > > > > > > > > > > >
回复:flink 历史数据join
是的 我想用datastrem 来做 join停的话 需要注意什么 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 19:36,zhisheng 写道: 所以现在纠结的是使用 DataStream 还是 DataSet ? 可以使用 DataStream,作业 join 完了停掉作业就行了。 小黑 于2020年5月15日周五 下午3:28写道: > > 先工作上有一个需求 2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据 > 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道 > 是用datatream还是dataset 没有一个很好的 解决方案 望给与回复 > > > > >
回复:flink 历史数据join
2个不同源的历史数据同步 需要join 这个不是给开发者用的 是客户用的 客户只要选择2个数据源的2个表 join 结果保存 难道还要用sql来做么 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 15:39,shao.hongxiao 写道: 1. 搞hive映射表,直接使用spark或者hive sql | | 邵红晓 | | 邮箱:17611022...@163.com | 签名由网易邮箱大师定制 在2020年5月15日 15:31,jimandlice 写道: 如果要集成公司产品呢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 15:30,shao.hongxiao 写道: 可以直接注册表,然后写sql来弄 | | 邵红晓 | | 邮箱:17611022...@163.com | 签名由 网易邮箱大师 定制 在2020年05月15日 13:17,jimandlice 写道: 就是要用api的方式来继承 不是直接操作sql那样来出来 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 11:38,jimandlice 写道: api 做 还是用table sql 来做 谁做比较好集成 因为都要用来join 之后数据写入 hdfs 当中 因为刚刚接手 有很多不太明白 望给予帮助 谢谢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 11:34,Benchao Li 写道: 看起来就是一个异构数据源join的需求吧。 可以直接用Flink SQL尝试一下。Flink SQL现在有batch读取Hbase、Mysql的能力,也有写入Hive的能力。 jimandlice 于2020年5月15日周五 上午11:16写道: 先工作上有一个需求 2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道 是用datatream还是dataset 没有一个很好的 解决方案 望给与回复 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
回复:flink 历史数据join
如果要集成公司产品呢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 15:30,shao.hongxiao 写道: 可以直接注册表,然后写sql来弄 | | 邵红晓 | | 邮箱:17611022...@163.com | 签名由 网易邮箱大师 定制 在2020年05月15日 13:17,jimandlice 写道: 就是要用api的方式来继承 不是直接操作sql那样来出来 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 11:38,jimandlice 写道: api 做 还是用table sql 来做 谁做比较好集成 因为都要用来join 之后数据写入 hdfs 当中 因为刚刚接手 有很多不太明白 望给予帮助 谢谢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 11:34,Benchao Li 写道: 看起来就是一个异构数据源join的需求吧。 可以直接用Flink SQL尝试一下。Flink SQL现在有batch读取Hbase、Mysql的能力,也有写入Hive的能力。 jimandlice 于2020年5月15日周五 上午11:16写道: > 先工作上有一个需求 2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据 > 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道 > 是用datatream还是dataset 没有一个很好的 解决方案 望给与回复 > > > > | | > jimandlice > | > | > 邮箱:jimandl...@163.com > | > > Signature is customized by Netease Mail Master -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
回复:怎么排查taskmanager频繁挂掉的原因?
大佬 也看看我的问题呀 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 15:14,shao.hongxiao 写道: 你的是batch 模式吗 | | 邵红晓 | | 邮箱:17611022...@163.com | 签名由 网易邮箱大师 定制 在2020年05月15日 15:05,Jeff 写道: hi all, 最近上线一批新任务后taskmanager频繁挂掉,很可能是OOM问题,操作系统日志里没找到相关记录,flink日志只找到如下部分,但还是不确定是什么原因,请问要怎么确定原因呢? id, channel, rowtime) -> select: (appid, channel, rowtime, 1 AS $f3) b91d36766995398a9b0c9416ac1fb6bc. 2020-05-14 08:55:30,504 ERROR org.apache.flink.runtime.taskmanager.Task - Task did not exit gracefully within 180 + seconds. 2020-05-14 08:55:30,505 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - Task did not exit gracefully within 180 + seconds. 2020-05-14 08:55:30,505 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Fatal error occurred while executing the TaskManager. Shutting it down... 2020-05-14 08:55:30,505 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopping TaskExecutor akka.tcp://flink@172.24.150.24:48412/user/taskmanager_0. 2020-05-14 08:55:30,508 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service. 2020-05-14 08:55:30,510 INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager. 2020-05-14 08:55:30,512 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory /tmp/flink-io-c882cdf4-840c-4c62-800e-0ca3226be20b 2020-05-14 08:55:30,512 INFO org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down the network environment and its components. 2020-05-14 08:55:30,514 INFO org.apache.flink.runtime.io.network.netty.NettyClient - Successful shutdown (took 2 ms). 2020-05-14 08:55:30,517 INFO org.apache.flink.runtime.io.network.netty.NettyServer - Successful shutdown (took 2 ms). 2020-05-14 08:55:30,545 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service. 2020-05-14 08:55:30,546 INFO org.apache.flink.runtime.filecache.FileCache - removed file cache directory /tmp/flink-dist-cache-3e0d4351-d5aa-4358-a028-9a59f398d92f 2020-05-14 08:55:30,550 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor akka.tcp://flink@172.24.150.24:48412/user/taskmanager_0. 2020-05-14 08:55:30,552 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache 2020-05-14 08:55:30,554 INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache 2020-05-14 08:55:30,563 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service. 2020-05-14 08:55:30,566 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. 2020-05-14 08:55:30,567 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports. 2020-05-14 08:55:30,570 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. 2020-05-14 08:55:30,571 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports. 2020-05-14 08:55:30,571 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource switched from RUNNING to FAILED. java.lang.RuntimeException: segment has been freed at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamCalcRule$2658.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.coll
回复:flink 历史数据join
就是要用api的方式来继承 不是直接操作sql那样来出来 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 11:38,jimandlice 写道: api 做 还是用table sql 来做 谁做比较好集成 因为都要用来join 之后数据写入 hdfs 当中 因为刚刚接手 有很多不太明白 望给予帮助 谢谢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 11:34,Benchao Li 写道: 看起来就是一个异构数据源join的需求吧。 可以直接用Flink SQL尝试一下。Flink SQL现在有batch读取Hbase、Mysql的能力,也有写入Hive的能力。 jimandlice 于2020年5月15日周五 上午11:16写道: > 先工作上有一个需求 2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据 > 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道 > 是用datatream还是dataset 没有一个很好的 解决方案 望给与回复 > > > > | | > jimandlice > | > | > 邮箱:jimandl...@163.com > | > > Signature is customized by Netease Mail Master -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
回复:flink 历史数据join
api 做 还是用table sql 来做 谁做比较好集成 因为都要用来join 之后数据写入 hdfs 当中 因为刚刚接手 有很多不太明白 望给予帮助 谢谢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月15日 11:34,Benchao Li 写道: 看起来就是一个异构数据源join的需求吧。 可以直接用Flink SQL尝试一下。Flink SQL现在有batch读取Hbase、Mysql的能力,也有写入Hive的能力。 jimandlice 于2020年5月15日周五 上午11:16写道: > 先工作上有一个需求 2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据 > 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道 > 是用datatream还是dataset 没有一个很好的 解决方案 望给与回复 > > > > | | > jimandlice > | > | > 邮箱:jimandl...@163.com > | > > Signature is customized by Netease Mail Master -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
flink 历史数据join
先工作上有一个需求 2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道 是用datatream还是dataset 没有一个很好的 解决方案 望给与回复 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master