Hi Team, I am facing some issue with Custom Partitioner in flink Streaming . I am using watcher to read file from folder and then I have to partition records and send to sink .
- This is happening if parallelism > 1 . - Checkpoint is enabled . - If I don't use partitioner , then everything works fine . - If use shuffle also then its fine . - If I use IdPartitioner then also its fine . Attaching Logs : ParitionLogs.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/ParitionLogs.log> *Main Class Code :* @SuppressWarnings("Duplicates") public class GroupContentScoring { private static final Logger LOGGER = LoggerFactory.getLogger(GroupContentScoring.class); private static final String CONFIG_DIR = "configDir"; private static final String PROPERTIES = "properties"; private static final String TYPES_TO_PROCESS = "typesToProcess"; private static final String CONTENT_TYPE = "group"; private static final String DECAY_SCORE_NAME = "decay-score-" + CONTENT_TYPE; public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final String configDir = params.get(CONFIG_DIR); final String propertiesFile = params.get(PROPERTIES); final String[] typesToProcess = params.get(TYPES_TO_PROCESS, EMPTY).split(COMMA); Preconditions.checkArgument(StringUtils.isNotEmpty(configDir), "Empty config dir is specified"); Preconditions.checkArgument(StringUtils.isNotEmpty(propertiesFile), "Empty properties file is specified"); Preconditions.checkArgument(typesToProcess.length > 0, " No type is specified for processing"); LOGGER.info("config directory for properties and config files : {} ", configDir); LOGGER.info("Content type to process are : {} ", Arrays.asList(typesToProcess)); final String configFile = configDir + "/config.json"; System.out.println("Config File " + configFile); final Configuration appConfigs = ParameterTool.fromPropertiesFile(new File(configDir + "/" + propertiesFile)).getConfiguration(); final StreamExecutionEnvironment env = prepareStreamEnv(appConfigs); final CsConfig groupConfig = CsConfig.fromFile(CONTENT_TYPE, configFile); groupConfig.put("watchFromDate", appConfigs.getString("watchFromDate", "")); groupConfig.put("entityType", "CONTENT_GROUP"); final DataStream<Event> input = prepareInput(env, groupConfig); final DataStream<Event> partitioned = input .partitionCustom(new EventPartitioner(), new CsEventKeySelector()); partitioned.addSink(new PrintSinkFunction<>()); env.execute(); } private static DataStream<Event> prepareInput(StreamExecutionEnvironment env, CsConfig groupConfig) { HdfsConfig hdfsConf = new HdfsConfig(); hdfsConf.setDir(groupConfig.get("dir").toString()); hdfsConf.setWatchIntervalInSeconds(groupConfig.getStrAsLong("watchIntervalInSeconds")); hdfsConf.setWatchFilter(groupConfig.get("watchFilter").toString()); hdfsConf.setFileProcessingMode(groupConfig.get("fileProcessingMode").toString()); hdfsConf.setParallelism(env.getParallelism()); hdfsConf.setMaxFiles(groupConfig.getStrAsInt("maxFiles")); hdfsConf.setWatchFromDate(groupConfig.get("watchFromDate").toString()); hdfsConf.setWatcherLookupThresholdHour(groupConfig.getStrAsInt("watcherLookupThresholdHour")); hdfsConf.setModTimeDiffInSeconds(groupConfig.getStrAsLong("modTimeDiffInSeconds")); AvroFileWatcherSource avroSource = new AvroFileWatcherSource(hdfsConf); return avroSource.readStream(env, "group"); } private static StreamExecutionEnvironment prepareStreamEnv(Configuration appConfigs) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final int checkPointInterval = appConfigs.getInteger("checkpoint.interval", 10_000); final int checkPointMinPause = appConfigs.getInteger("checkpoint.min.pause", 60_000); env.enableCheckpointing(checkPointInterval); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkPointMinPause); final StateBackend stateBackend = new FsStateBackend("hdfs:///new_data_pipeline/dev/phase2/"); // env.setStateBackend(stateBackend); return env; } } *Partitioner :* public class EventPartitioner implements Partitioner<String> { private static final long serialVersionUID = 1L; @Override public int partition(String key, int numPartitions) { return key.hashCode() % numPartitions; } } *Key Selector* public class CsEventKeySelector implements KeySelector<Event, String>, Serializable { private static final long serialVersionUID = 1L; @Override public String getKey(Event value) { final String[] split = value.getKey().split("~"); return split[1]; } } Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/