Hi Team,

I am facing some issue with Custom Partitioner in flink Streaming . I am
using watcher to read file from folder and then I have to partition records
and send to sink . 

- This is happening if parallelism > 1 .
- Checkpoint is enabled .
- If I don't use partitioner , then everything works fine . 
- If use shuffle also then its fine . 
- If I use IdPartitioner then also its fine . 

Attaching Logs :  ParitionLogs.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/ParitionLogs.log>
  


*Main Class Code :*




@SuppressWarnings("Duplicates")
public class GroupContentScoring {

  private static final Logger LOGGER =
LoggerFactory.getLogger(GroupContentScoring.class);
  private static final String CONFIG_DIR = "configDir";
  private static final String PROPERTIES = "properties";
  private static final String TYPES_TO_PROCESS = "typesToProcess";
  private static final String CONTENT_TYPE = "group";
  private static final String DECAY_SCORE_NAME = "decay-score-" +
CONTENT_TYPE;

  public static void main(String[] args) throws Exception {

    final ParameterTool params = ParameterTool.fromArgs(args);
    final String configDir = params.get(CONFIG_DIR);
    final String propertiesFile = params.get(PROPERTIES);
    final String[] typesToProcess = params.get(TYPES_TO_PROCESS,
EMPTY).split(COMMA);
    Preconditions.checkArgument(StringUtils.isNotEmpty(configDir), "Empty
config dir is specified");
    Preconditions.checkArgument(StringUtils.isNotEmpty(propertiesFile),
"Empty properties file is specified");
    Preconditions.checkArgument(typesToProcess.length > 0, " No type is
specified for processing");

    LOGGER.info("config directory for properties and config files : {} ",
configDir);
    LOGGER.info("Content type to process are  : {} ",
Arrays.asList(typesToProcess));

    final String configFile = configDir + "/config.json";
    System.out.println("Config File " + configFile);

    final Configuration appConfigs = ParameterTool.fromPropertiesFile(new
File(configDir + "/" + propertiesFile)).getConfiguration();
    final StreamExecutionEnvironment env = prepareStreamEnv(appConfigs);

    final CsConfig groupConfig = CsConfig.fromFile(CONTENT_TYPE,
configFile);
    groupConfig.put("watchFromDate", appConfigs.getString("watchFromDate",
""));
    groupConfig.put("entityType", "CONTENT_GROUP");

    final DataStream<Event> input = prepareInput(env, groupConfig);

    final DataStream<Event> partitioned = input
        .partitionCustom(new EventPartitioner(), new CsEventKeySelector());

    partitioned.addSink(new PrintSinkFunction<>());

    env.execute();


  }



  private static DataStream<Event> prepareInput(StreamExecutionEnvironment
env, CsConfig groupConfig) {

    HdfsConfig hdfsConf = new HdfsConfig();
    hdfsConf.setDir(groupConfig.get("dir").toString());
   
hdfsConf.setWatchIntervalInSeconds(groupConfig.getStrAsLong("watchIntervalInSeconds"));
    hdfsConf.setWatchFilter(groupConfig.get("watchFilter").toString());
   
hdfsConf.setFileProcessingMode(groupConfig.get("fileProcessingMode").toString());
    hdfsConf.setParallelism(env.getParallelism());
    hdfsConf.setMaxFiles(groupConfig.getStrAsInt("maxFiles"));
    hdfsConf.setWatchFromDate(groupConfig.get("watchFromDate").toString());
   
hdfsConf.setWatcherLookupThresholdHour(groupConfig.getStrAsInt("watcherLookupThresholdHour"));
   
hdfsConf.setModTimeDiffInSeconds(groupConfig.getStrAsLong("modTimeDiffInSeconds"));

    AvroFileWatcherSource avroSource = new AvroFileWatcherSource(hdfsConf);
    return avroSource.readStream(env, "group");
  }

  private static StreamExecutionEnvironment prepareStreamEnv(Configuration
appConfigs) {

    final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    final int checkPointInterval =
appConfigs.getInteger("checkpoint.interval", 10_000);
    final int checkPointMinPause =
appConfigs.getInteger("checkpoint.min.pause", 60_000);

    env.enableCheckpointing(checkPointInterval);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
   
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkPointMinPause);
    final StateBackend stateBackend = new
FsStateBackend("hdfs:///new_data_pipeline/dev/phase2/");
    // env.setStateBackend(stateBackend);
    return env;
  }

}


*Partitioner :*

public class EventPartitioner implements Partitioner<String> {

  private static final long serialVersionUID = 1L;

  @Override
  public int partition(String key, int numPartitions) {
    return key.hashCode() % numPartitions;
  }
}

*Key Selector*

public class CsEventKeySelector implements KeySelector<Event, String>,
Serializable {

  private static final long serialVersionUID = 1L;

  @Override
  public String getKey(Event value) {
    final String[] split = value.getKey().split("~");
    return split[1];
  }
}



Thanks











--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to