Hi Qi, See https://github.com/ScaleUnlimited/flink-utils/ <https://github.com/ScaleUnlimited/flink-utils/>, for a rough but working version of a bucketing sink.
— Ken > On Mar 13, 2019, at 7:46 PM, qi luo <luoqi...@gmail.com> wrote: > > Hi Ken, > > Agree. I will try partitonBy() to reducer the number of parallel sinks, and > may also try sortPartition() so each sink could write files one by one. > Looking forward to your solution. :) > > Thanks, > Qi > >> On Mar 14, 2019, at 2:54 AM, Ken Krugler <kkrugler_li...@transpac.com >> <mailto:kkrugler_li...@transpac.com>> wrote: >> >> Hi Qi, >> >>> On Mar 13, 2019, at 1:26 AM, qi luo <luoqi...@gmail.com >>> <mailto:luoqi...@gmail.com>> wrote: >>> >>> Hi Ken, >>> >>> Do you mean that I can create a batch sink which writes to N files? >> >> Correct. >> >>> That sounds viable, but since our data size is huge (billions of records & >>> thousands of files), the performance may be unacceptable. >> >> The main issue with performance (actually memory usage) is how many >> OutputFormats do you need to have open at the same time. >> >> If you partition by the same key that’s used to define buckets, then the max >> number is less, as each parallel instance of the sink only gets a unique >> subset of all possible bucket values. >> >> I’m actually dealing with something similar now, so I might have a solution >> to share soon. >> >> — Ken >> >> >>> I will check Blink and give it a try anyway. >>> >>> Thank you, >>> Qi >>> >>>> On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_li...@transpac.com >>>> <mailto:kkrugler_li...@transpac.com>> wrote: >>>> >>>> Hi Qi, >>>> >>>> If I understand what you’re trying to do, then this sounds like a >>>> variation of a bucketing sink. >>>> >>>> That typically uses a field value to create a directory path or a file >>>> name (though the filename case is only viable when the field is also >>>> what’s used to partition the data) >>>> >>>> But I don’t believe Flink has built-in support for that, in batch mode >>>> (see BucketingSink >>>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html> >>>> for streaming). >>>> >>>> Maybe Blink has added that? Hoping someone who knows that codebase can >>>> chime in here. >>>> >>>> Otherwise you’ll need to create a custom sink to implement the desired >>>> behavior - though abusing a MapPartitionFunction >>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html> >>>> would be easiest, I think. >>>> >>>> — Ken >>>> >>>> >>>> >>>>> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com >>>>> <mailto:luoqi...@gmail.com>> wrote: >>>>> >>>>> Hi Ken, >>>>> >>>>> Thanks for your reply. I may not make myself clear: our problem is not >>>>> about reading but rather writing. >>>>> >>>>> We need to write to N files based on key partitioning. We have to use >>>>> setParallelism() to set the output partition/file number, but when the >>>>> partition number is too large (~100K), the parallelism would be too high. >>>>> Is there any other way to achieve this? >>>>> >>>>> Thanks, >>>>> Qi >>>>> >>>>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com >>>>>> <mailto:kkrugler_li...@transpac.com>> wrote: >>>>>> >>>>>> Hi Qi, >>>>>> >>>>>> I’m guessing you’re calling createInput() for each input file. >>>>>> >>>>>> If so, then instead you want to do something like: >>>>>> >>>>>> Job job = Job.getInstance(); >>>>>> >>>>>> for each file… >>>>>> FileInputFormat.addInputPath(job, new >>>>>> org.apache.hadoop.fs.Path(file path)); >>>>>> >>>>>> env.createInput(HadoopInputs.createHadoopInput(…, job) >>>>>> >>>>>> Flink/Hadoop will take care of parallelizing the reads from the files, >>>>>> given the parallelism that you’re specifying. >>>>>> >>>>>> — Ken >>>>>> >>>>>> >>>>>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com >>>>>>> <mailto:luoqi...@gmail.com>> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> We’re trying to distribute batch input data to (N) HDFS files >>>>>>> partitioning by hash using DataSet API. What I’m doing is like: >>>>>>> >>>>>>> env.createInput(…) >>>>>>> .partitionByHash(0) >>>>>>> .setParallelism(N) >>>>>>> .output(…) >>>>>>> >>>>>>> This works well for small number of files. But when we need to >>>>>>> distribute to large number of files (say 100K), the parallelism becomes >>>>>>> too large and we could not afford that many TMs. >>>>>>> >>>>>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control >>>>>>> the parallelism separately (using dynamic allocation). Is there >>>>>>> anything similar in Flink or other way we can achieve similar result? >>>>>>> Thank you! >>>>>>> >>>>>>> Qi >>>>>> >>>>>> -------------------------- >>>>>> Ken Krugler >>>>>> +1 530-210-6378 >>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>>>>> Custom big data solutions & training >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra >>>>>> >>>>> >>>> >>>> -------------------------- >>>> Ken Krugler >>>> +1 530-210-6378 >>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>>> Custom big data solutions & training >>>> Flink, Solr, Hadoop, Cascading & Cassandra >>>> >>> >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra > -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra