Hi:

  I design ignite cluster as following:
1. Among cluster one role is job submitter (client mode), another role is
worker (server mode)
2. job submitter submit IgniteRunnable via Executor Service to worker node
3. IgniteRunnable answer for receive and process Kafka message
4. In client mode submit code, add shutdown hook to close kafka consumer in
IgniteRunnable and call ExecutionService.shutdown() for graceful cancel
Remote IgniteRunnable when client node exit.
5. All server node start from ignite release binary package via ignite.sh.
all customer code package as jar add to ignite libs dir. client node start
from java main.
6. When I terminate client process, the log show shutdown hook called, the
kafka consumer close called, executionservice.shutdown called, but the
remote IgniteRunnable still running and process kafka message

Code like this:

1. Client submit code:
public class IgniteKafkaOrderPaymentCompleteStreamingJob extends
IgniteBaseJob {

        private static final int INITIAL_COUNT = 5;

        private static final String PAYMENT_COMPLETED_DATA_PROCESSOR =
"paymentCompletedDataProcesser";

        public static void main(String[] args) {
                String springConfigProperty = getSpringPropertiesSuffix();
                ExecutorService executionService =
createExecutionService(true,IgniteJobConstants.IGNITE_CLUSTER_COMPUTE_ROLE);
                List<IgniteKafkaPaymentCompleteConsumer> consumers = new 
ArrayList();
                for (int i = 0; i < INITIAL_COUNT; i++) {
                        IgniteKafkaPaymentCompleteConsumer 
paymentCompleteStreamingConsumer = new
IgniteKafkaPaymentCompleteConsumer();
                
paymentCompleteStreamingConsumer.setProcesserBeanName(PAYMENT_COMPLETED_DATA_PROCESSOR);
                
paymentCompleteStreamingConsumer.setDataProcesserClass(IOrderDataProcesser.class);
                
paymentCompleteStreamingConsumer.setConfigProperties(springConfigProperty);
                        consumers.add(paymentCompleteStreamingConsumer);
                        
executionService.submit(paymentCompleteStreamingConsumer);
                }
                AddShutDownHock(executionService, consumers);

        }

}

public abstract class IgniteBaseJob {

        public static IgniteLogger log;
        private static String SPRING_PROFILE_KEY = "spring.profile.active";

        /**
         * This method used to create the execution service
         * @param clientMode
         * @param roleInstance
         * @return
         */
        protected static ExecutorService createExecutionService(Boolean 
clientMode,
String roleInstance) {
                Ignition.setClientMode(clientMode);
                Ignite ignite = initializeIgniteContext("ignite-default.xml");
                IgniteCluster cluster = ignite.cluster();
                ClusterGroup worker =
cluster.forAttribute(IgniteJobConstants.IGNITE_CLUSTER_GROUP_KEY,
roleInstance);
                return ignite.executorService(worker);
        }

        /**
         * This method used to getting the JVM parameter which used to indicate
which
         *
         * @return
         */
        protected static String getSpringPropertiesSuffix() {
                String springActiveProfile = 
System.getProperty(SPRING_PROFILE_KEY);
                String springPropertiesSuffix = 
StringUtils.isBlank(springActiveProfile) ?
SpringPropertiesType.production.name() : springActiveProfile;
                return springPropertiesSuffix;
        }

        protected static Ignite initializeIgniteContext() {
                try {
                        Ignite ignite = Ignition.start(
                                
IgniteBaseJob.class.getClassLoader().getResourceAsStream("fds-ignite-develop.xml"));
                        log = Ignition.ignite().log();
                        return ignite;
                } catch (Exception e) {
                        e.printStackTrace();

                        log.error("Ignite context initialize error with the 
error details" + e);
                }
                return null;
        }

        /**
         * This method used to initialize the Ignite Context with input ignite
property file
         * @param fileName
         * @return
         */
        protected static Ignite initializeIgniteContext(String fileName) {
                try {
                        Ignite ignite = Ignition.start(
                                        
IgniteBaseJob.class.getClassLoader().getResourceAsStream(
                                                        fileName));
                        if (log == null) {
                                log = Ignition.ignite().log();
                        }
                        return ignite;
                } catch (Exception e) {
                        e.printStackTrace();
                        log.error("Ignite context initialize error with the 
error details" + e);
                }
                return null;
        }

        /**
         * This method used to add shutdownhock
         * @param executionService
         * @param consumers
         */
        protected static void AddShutDownHock(ExecutorService executionService,
List<? extends AbstractIgniteKafkaConsumer> consumers) {
                Runtime.getRuntime().addShutdownHook(new Thread() {
                        @Override
                        public void run() {
                                log.info("***********************Shutdownhock 
got executed");
                                for (AbstractIgniteKafkaConsumer consumer : 
consumers) {
                                        consumer.stop();
                                }
                                executionService.shutdown();
                                try {
                                        executionService.awaitTermination(5000, 
TimeUnit.MILLISECONDS);
                                } catch (InterruptedException e) {
                                        log.error("Error happens during 
shutdown the distributed kafka
streamer");
                                }
                        }
                });
        }

}

2. IgniteRunnale for consume kafka message

@Getter
@Setter
public class IgniteKafkaPaymentCompleteConsumer extends
AbstractIgniteKafkaConsumer<OrderMessage>{
    public static final String PAYMENT_COMPLETE_STREAMING_EXECUTION_CONTEXT
= "paymentCompleteStreamingExecutionContext";

    public IgniteKafkaPaymentCompleteConsumer() {
        this.kafkaStreamer = new IgniteKafkaStreamer<>();
        this.log = Ignition.ignite().log();
    }
    public void run() {
        try {
            log.info("Start to initialize the  Payment Complete Ignite
stream!");
            initKafkaConsumer(PAYMENT_COMPLETE_STREAMING_EXECUTION_CONTEXT);

            kafkaStreamer.setMultipleTupleExtractor(

                    new
StreamMultipleTupleExtractor<MessageAndMetadata&lt;byte[], byte[]>, String,
String>() {
                        @Override
                        public Map<String, String>
extract(MessageAndMetadata<byte[], byte[]> msg) {
                            try {
                                String orderMsg = new String(msg.message(),
"UTF-8");
                                if(msg!=null &&
StringUtils.isNotBlank(orderMsg)){
                                    log.debug(("===================>order
message:"+kafkaStreamer.getTopic() + ":::::::" + orderMsg));
                                   
//System.out.println(("===================>completed order
message:"+kafkaStreamer.getTopic() + ":::::::" + orderMsg));
                                    OrderMessage orderMessage =
JsonUtils.json2Object(orderMsg, OrderMessage.class);
                                   
orderMessage.getAdditionalProperties().put("OrderMessageType",
OrderMessageType.PAYMENT_COMPLETE);
                                   
getiDataProcesser().process(orderMessage, application);
                                }
                            } catch (Exception ex) {
                                log.error("Error happens when try to extract
the data from the order message kafka topic" +
ExceptionUtils.getFullStackTrace(ex));
                            }

                            return null;
                        }
                    });

            kafkaStreamer.start();
            log.info("Ignite kafka stream started!");
        } catch (Exception e) {
            log.info("error happens during starting the ignite kafka
stream:" + e);
        }
    }



}

@Getter
@Setter
public abstract class AbstractIgniteKafkaConsumer<T> implements
IgniteRunnable{

        protected IgniteLogger log;
        protected IgniteKafkaStreamer<String, String> kafkaStreamer;
        protected ApplicationContext application;
        protected String configProperties;
        protected StreamingExecutionContext streamingExecutionContext;
        private IDataProcesser<T> iDataProcesser;
        private String avroSchemaFile;
        private Class<? extends IDataProcesser> dataProcesserClass;
        private String processerBeanName;
        private String topic;

        public IDataProcesser<T> getiDataProcesser() {
                return iDataProcesser;
        }

        public void setiDataProcesser() {
                this.iDataProcesser =
SpringContextLoader.getBeanByNameWithType(application, processerBeanName,
dataProcesserClass);
        }

        /**
         * This method used to compose the default consumer config
         * @param streamingExecutionContext
         * @return
         */
        protected ConsumerConfig
createDefaultConsumerConfig(StreamingExecutionContext
streamingExecutionContext) {
                Properties props = new Properties();
                props.put("zookeeper.connect",
streamingExecutionContext.getZookeeperUrl());
                props.put("group.id", streamingExecutionContext.getGroupId());
                props.put("zookeeper.session.timeout.ms",
streamingExecutionContext.getZookerperSessionTimeOut());
                props.put("zookeeper.sync.time.ms", "200");
                props.put("auto.commit.interval.ms",
streamingExecutionContext.getAutoCommitInterval());
                return new ConsumerConfig(props);
        }

        /**
         * This method used to initialize the kafka streamer
         * @param executionContextBeanName
         */
        protected void initKafkaConsumer(String executionContextBeanName) {
                application =
SpringContextLoader.getInstance().initialize(getConfigProperties());
                streamingExecutionContext =
SpringContextLoader.getInstance().getBeanByName(executionContextBeanName,
                                StreamingExecutionContext.class);
                setiDataProcesser();
                Ignite ignite = Ignition.ignite();
                log = ignite.log();
                kafkaStreamer.setIgnite(ignite);
                kafkaStreamer.setTopic(streamingExecutionContext.getTopic());
                kafkaStreamer.setThreads(4);
        
kafkaStreamer.setConsumerConfig(createDefaultConsumerConfig(streamingExecutionContext));

        }

        /**
         * This method used to stop the kafka consumer when shutdown the ignite
job.
         */
        public void stop() {
                if (kafkaStreamer != null)
                        kafkaStreamer.stop();
                log.info("Ignite kafka stream stopped!");
        }

}

Could you help answer my question?

Thanks

James





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to