Hello, I think you are hitting https://issues.apache.org/jira/browse/KAFKA-4361, which is fixed in 0.10.1.1 and beyond.
Guozhang On Thu, Dec 7, 2017 at 12:49 AM, zhaoi...@163.com <zhaoi...@163.com> wrote: > Hello, > Working with kafka 0.10.1.0, I used these config > > val props = new Properties > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker) > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass) > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer.getClass) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), > "latest") > > > > > but these code > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), > "latest") > does not work, what is the reason? > > > I read the code of org.apache.kafka.streams.StreamsConfig, there has some > code: > > > > private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES; > > static > > { > > Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(); > > tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, > "1000"); > > tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "earliest"); > > tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > "false"); > > > > > CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap( > tempConsumerDefaultOverrides); > > } > > > > public Map<String, Object> getConsumerConfigs(StreamThread streamThread, > String groupId, String clientId) throws ConfigException { > > final Map<String, Object> consumerProps = > getClientPropsWithPrefix(CONSUMER_PREFIX, > ConsumerConfig.configNames()); > > > > > // disable auto commit and throw exception if there is user overridden > values, > > // this is necessary for streams commit semantics > > if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) > { > > throw new ConfigException("Unexpected user-specified consumer > config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG > > + ", as the streams client will always turn off auto > committing."); > > } > > > > > consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES); > > > > > // bootstrap.servers should be from StreamsConfig > > consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > this.originals().get(BOOTSTRAP_SERVERS_CONFIG)); > > // add client id with stream client id prefix, and group id > > consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); > > consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + > "-consumer"); > > > > > // add configs required for stream partition assignor > > consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, > streamThread); > > consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, > getInt(REPLICATION_FACTOR_CONFIG)); > > consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, > getInt(NUM_STANDBY_REPLICAS_CONFIG)); > > consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > StreamPartitionAssignor.class.getName()); > > > consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, > getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); > > if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) { > > consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > getString(ZOOKEEPER_CONNECT_CONFIG)); > > } > > > > > consumerProps.put(APPLICATION_SERVER_CONFIG, > getString(APPLICATION_SERVER_CONFIG)); > > return consumerProps; > > } > > > > > It will be use the CONSUMER_DEFAULT_OVERRIDES override the config of I > set? -- -- Guozhang