Hi All, I am trying to read data from Kafka and ingest into Kudu using Spark Streaming. I am not using KuduContext to perform the upsert operation into kudu. Instead using Kudus native Client API to build the PartialRow and applying the operation for every record from Kafka. I am able to run the spark streaming job and every thing looks good. I am able to see the data into Kudu tables. But, after processing few batches, when I bring down the Kudu service, then my executor program becomes a zombie(the execution is not at all coming to my executor class anymore) and the internal threads that establishes connection to Kudu(which I am not handling in my code) is throwing exceptions which I am not able to handle resulting in message loss. Please find below the exception:
18/02/23 00:16:30 ERROR client.TabletClient: [Peer bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream on [id: 0x6e13b01f] java.net.ConnectException: Connection refused: kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Also, my executor code(one of the map transformation in the lineage is calling the below class) which establishes the connection to Kudu once per JVM when application start is: package org.dwh.streaming.kudu.sparkkudustreaming; import java.util.List; import java.util.Map; import org.apache.kudu.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.dwh.streaming.kudu.sparkkudustreaming.config.LoadAppConf; import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialNullConstants; import org.dwh.streaming.kudu.sparkkudustreaming.models.Store; public class KuduProcess { private static Logger logger = LoggerFactory.getLogger(KuduProcess.class); private static final KuduProcess instance = new KuduProcess(); private static KuduClient client; private static KuduTable table; private static KuduSession session; private static OperationResponse response; private KuduProcess(){ try { Store store = LoadAppConf.loadAppConf(); client = new KuduClient.KuduClientBuilder(store.getKuduHost()).build(); table = client.openTable(store.getKuduTable()); session = client.newSession(); session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); } catch (KuduException e) { logger.error("Kudu Exception:"+ e.getMessage()); } } public static String upsertKudu(Map<String, Object> formattedMap) { if (formattedMap.size() != 0) { try { Upsert upsert = table.newUpsert(); PartialRow row = upsert.getRow(); for(Map.Entry<String, Object> entry: formattedMap.entrySet()){ if (entry.getValue().getClass().equals(String.class)){ if(entry.getValue().equals(SpecialNullConstants.specialStringNull)) row.setNull(entry.getKey()); else row.addString(entry.getKey(), (String) entry.getValue()); } else if (entry.getValue().getClass().equals(Long.class)){ if(entry.getValue().equals(SpecialNullConstants.specialLongNull)) row.setNull(entry.getKey()); else row.addLong(entry.getKey(), (Long) entry.getValue()); } else if (entry.getValue().getClass().equals(Integer.class)){ if(entry.getValue().equals(SpecialNullConstants.specialIntNull)) row.setNull(entry.getKey()); else row.addInt(entry.getKey(), (Integer) entry.getValue()); } } session.apply(upsert); List<OperationResponse> responses = session.flush(); for (OperationResponse r : responses) { if (r.hasRowError()) { RowError e = r.getRowError(); if ("ALREADY_PRESENT".equals(e.getErrorStatus())) { continue; } logger.error("Error inserting " + e.getOperation().toString() + ": " + e.toString()); } } } catch (Exception e) { logger.error("Exception during upsert:",e); } } return "SUCCESS"; } } Any suggestion on handling this case where I can avoid data loss is helpful. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org