[GitHub] storm issue #2098: STORM-2499: Add Serialization plugin for EventHub System ...

2017-05-12 Thread rban1
Github user rban1 commented on the issue:

https://github.com/apache/storm/pull/2098
  
@harshach Can you please merge it if the changes look good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2098: STORM-2499: Add Serialization plugin for EventHub System ...

2017-05-09 Thread rban1
Github user rban1 commented on the issue:

https://github.com/apache/storm/pull/2098
  
@harshach Thanks for the review. I have squashed commits into 1 and added a 
section in the README


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2098: STORM-2499: Add Serialization plugin for EventHub ...

2017-05-04 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2098#discussion_r114857665
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
 ---
@@ -23,5 +23,6 @@
   public static final String Offset = "offset";
   public static final String Message = "message";
   public static final String META_DATA = "metadata";
+  public static final String SYSTEM_META_DATA = "systemmetadata";
--- End diff --

will change it to eventdata_system_properties


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2098: STORM-2499: Add Serialization plugin for EventHub ...

2017-05-04 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2098#discussion_r114857534
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataSystemPropertiesScheme.java
 ---
@@ -0,0 +1,51 @@
+package org.apache.storm.eventhubs.spout;
+
+import com.microsoft.azure.eventhubs.EventData;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An Event Data Scheme which deserializes message payload into the raw 
bytes.
+ *
+ * The resulting tuple would contain two items, the first being the message
+ * bytes, and the second a map of properties that include System Metadata
--- End diff --

Will make this serialization scheme have both Properties and 
SystemProperties


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2098: STORM-2499: Add Serialization plugin for EventHub ...

2017-05-03 Thread rban1
GitHub user rban1 opened a pull request:

https://github.com/apache/storm/pull/2098

STORM-2499: Add Serialization plugin for EventHub System Properties

This PR adds a serialization scheme for eventhub systemproperties. There is 
also a bug fix in an existing serialization scheme which violates the schema 
setting.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rban1/storm eventhub4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2098


commit db2aa61655edfe8efd43ed209fe647beb5e774fe
Author: Ranjan Banerjee <raba...@microsoft.com>
Date:   2017-05-04T00:39:31Z

STORM-2499: Add Serialization plugin for EventHub System Properties




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2026: STORM-2371: Replace existing AMQP eventhub client with th...

2017-04-09 Thread rban1
Github user rban1 commented on the issue:

https://github.com/apache/storm/pull/2026
  
Made the necessary point changes and updated pom to use 0.13.1
@harshach  I have changed the PR name appropriately. Also rebased and I 
have only one commit in the commit history.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-04-02 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r109327936
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
 ---
@@ -44,21 +38,32 @@
 public class StringEventDataScheme implements IEventDataScheme {
 
   private static final long serialVersionUID = 1L;
+  private static final Logger logger = 
LoggerFactory.getLogger(StringEventDataScheme.class);
 
   @Override
-  public List deserialize(Message message) {
+  public List deserialize(EventData eventData) {
 final List fieldContents = new ArrayList();
-
-for (Section section : message.getPayload()) {
-  if (section instanceof Data) {
-Data data = (Data) section;
-fieldContents.add(new String(data.getValue().getArray()));
-  } else if (section instanceof AmqpValue) {
-AmqpValue amqpValue = (AmqpValue) section;
-fieldContents.add(amqpValue.getValue().toString());
+String messageData = "";
+if (eventData.getBytes()!=null) {
+  messageData = new String(eventData.getBytes());
+}
+/*Will only serialize AMQPValue type*/
+else if (eventData.getObject()!=null) {
+  try {
+if (!(eventData.getObject() instanceof List)) {
+  messageData = eventData.getObject().toString();
+} else {
+  throw new RuntimeException("Cannot serialize the given AMQP 
type.");
--- End diff --

will change it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-04-02 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r109327926
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
 ---
@@ -44,27 +41,31 @@
 public class EventDataScheme implements IEventDataScheme {
 
private static final long serialVersionUID = 1L;
-
+   private static final Logger logger = 
LoggerFactory.getLogger(EventDataScheme.class);
@Override
-   public List deserialize(Message message) {
+   public List deserialize(EventData eventData) {
final List fieldContents = new ArrayList();
-
-   Map metaDataMap = new HashMap();
String messageData = "";
-
-   for (Section section : message.getPayload()) {
-   if (section instanceof Data) {
-   Data data = (Data) section;
-   messageData = new 
String(data.getValue().getArray());
-   } else if (section instanceof AmqpValue) {
-   AmqpValue amqpValue = (AmqpValue) section;
-   messageData = amqpValue.getValue().toString();
-   } else if (section instanceof ApplicationProperties) {
-   final ApplicationProperties 
applicationProperties = (ApplicationProperties) section;
-   metaDataMap = applicationProperties.getValue();
+   if (eventData.getBytes()!=null) {
+   messageData = new String(eventData.getBytes());
+   }
+   /*Will only serialize AMQPValue type*/
+   else if (eventData.getObject()!=null) {
+   try {
+   if (!(eventData.getObject() instanceof List)) {
+   messageData = 
eventData.getObject().toString();
+   } else {
+   throw new RuntimeException("Cannot 
serialize the given AMQP type");
+   }
+   } catch (RuntimeException e) {
+   logger.error("Failed to serialize EventData 
payload class"
+   + 
eventData.getObject().getClass());
+   logger.error("Exception encountered while 
serializing EventData payload is"
+   + e.toString());
+   throw e;
}
}
-
+   Map metaDataMap = eventData.getProperties().size() > 0 ? 
eventData.getProperties() : null;
fieldContents.add(messageData);
fieldContents.add(metaDataMap);
--- End diff --

will remove them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-04-02 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r109327884
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
 ---
@@ -41,29 +41,29 @@ public PartitionManager(
 
 super(spoutConfig, partitionId, stateStore, receiver);
 
-this.pending = new LinkedHashMap<String, EventData>();
-this.toResend = new TreeSet();
+this.pending = new LinkedHashMap<String, EventDataWrap>();
+this.toResend = new TreeSet();
--- End diff --

it is indeed ordered by sequence number. The compareto definition in 
eventdatawrap class is as follows:
@Override
  public int compareTo(EventDataWrap ed) {
return messageId.getSequenceNumber().
compareTo(ed.getMessageId().getSequenceNumber());
  }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2026: Eventhub3

2017-03-29 Thread rban1
Github user rban1 commented on the issue:

https://github.com/apache/storm/pull/2026
  
@SreeramGarlapati  Made all the relavant changes(Serialization, exception 
handling, enqueuetimefilter)
@harshach I will be using this PR going forward. I have addressed your 
comments on syntax as well. On your comment of backporting the changes to 1.x 
branches it cannot be done as the latest eventhubclient is valid only from java 
8 environment. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-29 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r108799438
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
@@ -65,77 +57,81 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, 
String partitionId) {
   }
 
   @Override
-  public void open(IEventHubFilter filter) throws EventHubException {
-logger.info("creating eventhub receiver: partitionId=" + partitionId + 
-   ", filterString=" + filter.getFilterString());
+  public void open(String offset) throws EventHubException {
+logger.info("creating eventhub receiver: partitionId=" + partitionId +
+", offset=" + offset);
 long start = System.currentTimeMillis();
-receiver = new ResilientEventHubReceiver(connectionString, entityName,
-   partitionId, consumerGroupName, defaultCredits, filter);
-receiver.initialize();
-
+try {
+  ehClient = 
EventHubClient.createFromConnectionStringSync(connectionString);
+  receiver = ehClient.createEpochReceiverSync(
+  consumerGroupName,
+  partitionId,
+  offset,
+  false,
+  1);
+}catch (Exception e){
+  logger.info("Exception in creating EventhubClient"+e.toString());
+}
 long end = System.currentTimeMillis();
 logger.info("created eventhub receiver, time taken(ms): " + 
(end-start));
   }
 
   @Override
-  public void close() {
+  public void close(){
 if(receiver != null) {
-  receiver.close();
+  try {
+receiver.close().whenComplete((voidargs,error)->{
+  try{
+if(error!=null){
+  logger.error("Exception during receiver close 
phase"+error.toString());
+}
+ehClient.closeSync();
+  }catch (Exception e){
+logger.error("Exception during ehclient close 
phase"+e.toString());
+  }
+}).get();
+  }catch (InterruptedException e){
+logger.error("Exception occured during close phase"+e.toString());
+  }catch (ExecutionException e){
+logger.error("Exception occured during close phase"+e.toString());
+  }
   logger.info("closed eventhub receiver: partitionId=" + partitionId );
   receiver = null;
+  ehClient =  null;
 }
   }
-  
+
+
   @Override
   public boolean isOpen() {
 return (receiver != null);
   }
 
   @Override
-  public EventData receive(long timeoutInMilliseconds) {
+  public EventDataWrap receive() {
 long start = System.currentTimeMillis();
-Message message = receiver.receive(timeoutInMilliseconds);
+Iterable receivedEvents=null;
+/*Get one message at a time for backward compatibility behaviour*/
+try {
+  receivedEvents = receiver.receiveSync(1);
--- End diff --

Made the change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-22 Thread rban1
GitHub user rban1 opened a pull request:

https://github.com/apache/storm/pull/2026

Eventhub3

Incorporating latest eventhub API changes

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rban1/storm eventhub3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2026.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2026


commit 098272d753f406754ad17f4ba3ecd6a08881d82c
Author: Ranjan Banerjee <raba...@microsoft.com>
Date:   2017-02-21T21:51:16Z

Implementing new eventhub driver

commit 756b3b8bba2acc455e571a5cdc2ea12c5f50c148
Author: Ranjan Banerjee <raba...@microsoft.com>
Date:   2017-03-22T17:01:02Z

Storm 2371: Eventhub api changes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1998: Eventhub2

2017-03-13 Thread rban1
GitHub user rban1 opened a pull request:

https://github.com/apache/storm/pull/1998

Eventhub2

latest changes

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rban1/storm eventhub2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1998.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1998


commit 098272d753f406754ad17f4ba3ecd6a08881d82c
Author: Ranjan Banerjee <raba...@microsoft.com>
Date:   2017-02-21T21:51:16Z

Implementing new eventhub driver

commit 2a603b34013ef0ab5f32aec6e213c610c58aa8d8
Author: Ranjan Banerjee <raba...@microsoft.com>
Date:   2017-03-13T22:56:22Z

eventhub latest changes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1997: Eventhub2

2017-03-13 Thread rban1
Github user rban1 closed the pull request at:

https://github.com/apache/storm/pull/1997


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1997: Eventhub2

2017-03-13 Thread rban1
GitHub user rban1 opened a pull request:

https://github.com/apache/storm/pull/1997

Eventhub2

Latest eventhub changes

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rban1/storm eventhub2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1997.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1997


commit 098272d753f406754ad17f4ba3ecd6a08881d82c
Author: Ranjan Banerjee <raba...@microsoft.com>
Date:   2017-02-21T21:51:16Z

Implementing new eventhub driver

commit 89e4cc7988fb2d4affb640382c2dbd5591837c53
Author: Ranjan Banerjee <raba...@microsoft.com>
Date:   2017-03-13T22:32:43Z

EVenthub changes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-06 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1985#discussion_r104561334
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
@@ -65,77 +60,80 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, 
String partitionId) {
   }
 
   @Override
-  public void open(IEventHubFilter filter) throws EventHubException {
-logger.info("creating eventhub receiver: partitionId=" + partitionId + 
-   ", filterString=" + filter.getFilterString());
+  public void open(String offset) throws EventHubException {
+logger.info("creating eventhub receiver: partitionId=" + partitionId +
+", offset=" + offset);
 long start = System.currentTimeMillis();
-receiver = new ResilientEventHubReceiver(connectionString, entityName,
-   partitionId, consumerGroupName, defaultCredits, filter);
-receiver.initialize();
-
+try {
+  ehClient = 
EventHubClient.createFromConnectionStringSync(connectionString);
+  receiver = ehClient.createEpochReceiverSync(
+  consumerGroupName,
+  partitionId,
+  offset,
+  false,
+  1);
--- End diff --

Will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-06 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1985#discussion_r104561293
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
 ---
@@ -46,25 +42,11 @@
private static final long serialVersionUID = 1L;
 
@Override
-   public List deserialize(Message message) {
+   public List deserialize(EventData eventData) {
final List fieldContents = new ArrayList();
-
-   Map metaDataMap = new HashMap();
String messageData = "";
-
-   for (Section section : message.getPayload()) {
-   if (section instanceof Data) {
-   Data data = (Data) section;
-   messageData = new 
String(data.getValue().getArray());
-   } else if (section instanceof AmqpValue) {
-   AmqpValue amqpValue = (AmqpValue) section;
-   messageData = amqpValue.getValue().toString();
-   } else if (section instanceof ApplicationProperties) {
-   final ApplicationProperties 
applicationProperties = (ApplicationProperties) section;
-   metaDataMap = applicationProperties.getValue();
-   }
-   }
-
+   messageData = new String 
(eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset());
--- End diff --

Thanks. Is there any changes for now that you see in the serialization code 
that needs to be incorporated? Otherwise I will keep an eye for the 0.12.0 
version and then incorporate AmqpValue also as a payload


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-06 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1985#discussion_r10456
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
 ---
@@ -46,19 +39,11 @@
   private static final long serialVersionUID = 1L;
 
   @Override
-  public List deserialize(Message message) {
+  public List deserialize(EventData eventData) {
 final List fieldContents = new ArrayList();
-
-for (Section section : message.getPayload()) {
-  if (section instanceof Data) {
-Data data = (Data) section;
-fieldContents.add(new String(data.getValue().getArray()));
-  } else if (section instanceof AmqpValue) {
-AmqpValue amqpValue = (AmqpValue) section;
-fieldContents.add(amqpValue.getValue().toString());
-  }
-}
-
+String messageData = "";
+messageData = new String 
(eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset());
--- End diff --

will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-03 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1985#discussion_r104231241
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
@@ -65,77 +60,80 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, 
String partitionId) {
   }
 
   @Override
-  public void open(IEventHubFilter filter) throws EventHubException {
-logger.info("creating eventhub receiver: partitionId=" + partitionId + 
-   ", filterString=" + filter.getFilterString());
+  public void open(String offset) throws EventHubException {
+logger.info("creating eventhub receiver: partitionId=" + partitionId +
+", offset=" + offset);
 long start = System.currentTimeMillis();
-receiver = new ResilientEventHubReceiver(connectionString, entityName,
-   partitionId, consumerGroupName, defaultCredits, filter);
-receiver.initialize();
-
+try {
+  ehClient = 
EventHubClient.createFromConnectionStringSync(connectionString);
+  receiver = ehClient.createEpochReceiverSync(
+  consumerGroupName,
+  partitionId,
+  offset,
+  false,
+  1);
+}catch (Exception e){
+  logger.info("Exception in creating EventhubClient"+e.toString());
+}
 long end = System.currentTimeMillis();
 logger.info("created eventhub receiver, time taken(ms): " + 
(end-start));
   }
 
   @Override
-  public void close() {
+  public void close(){
 if(receiver != null) {
-  receiver.close();
+  try {
+receiver.close().whenComplete((voidargs,error)->{
+  try{
+if(error!=null){
+  logger.error("Exception during receiver close 
phase"+error.toString());
+}
+ehClient.closeSync();
+  }catch (Exception e){
+logger.error("Exception during ehclient close 
phase"+e.toString());
+  }
+}).get();
+  }catch (InterruptedException e){
+logger.error("Exception occured during close phase"+e.toString());
+  }catch (ExecutionException e){
+logger.error("Exception occured during close phase"+e.toString());
+  }
   logger.info("closed eventhub receiver: partitionId=" + partitionId );
   receiver = null;
+  ehClient =  null;
 }
   }
+
   
   @Override
   public boolean isOpen() {
 return (receiver != null);
   }
 
   @Override
-  public EventData receive(long timeoutInMilliseconds) {
+  public EventDataWrap receive() {
 long start = System.currentTimeMillis();
-Message message = receiver.receive(timeoutInMilliseconds);
+Iterable receivedEvents=null;
+/*Get one message at a time for backward compatibility behaviour*/
+try {
+  receivedEvents = receiver.receiveSync(1);
+}catch (Exception e){
+  logger.error("Exception occured during receive"+e.toString());
+}
 long end = System.currentTimeMillis();
 long millis = (end - start);
 receiveApiLatencyMean.update(millis);
 receiveApiCallCount.incr();
-
-if (message == null) {
-  //Temporary workaround for AMQP/EH bug of failing to receive messages
-  /*if(timeoutInMilliseconds > 100 && millis < 
timeoutInMilliseconds/2) {
-throw new RuntimeException(
-"Restart EventHubSpout due to failure of receiving messages in 
"
-+ millis + " millisecond");
-  }*/
+if (receivedEvents == null) {
--- End diff --

check for length also


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-03 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1985#discussion_r104228014
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
 ---
@@ -18,14 +18,16 @@
 package org.apache.storm.eventhubs.bolt;
 
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
--- End diff --

order the imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-03 Thread rban1
Github user rban1 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1985#discussion_r104227737
  
--- Diff: external/storm-eventhubs/pom.xml ---
@@ -95,6 +95,11 @@
 ${eventhubs.client.version}
 
 
+com.microsoft.azure
+azure-eventhubs
+0.11.0
--- End diff --

make it a property


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-02 Thread rban1
GitHub user rban1 opened a pull request:

https://github.com/apache/storm/pull/1985

STORM-2371: New eventhub implementation

Made the changes with the latest comments

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rban1/storm eventhub1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1985.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1985


commit e3545aa692673bd379d88d5009c2b8662a9817ec
Author: Ranjan Banerjee <raba...@microsoft.com>
Date:   2017-03-03T03:28:49Z

STORM-2371: New eventhub implementation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

2017-02-21 Thread rban1
GitHub user rban1 opened a pull request:

https://github.com/apache/storm/pull/1951

STORM-2371 Implementing new eventhub driver

Changing the undelying receiver implementation with the latest Eventhub 
receiver

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rban1/storm master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1951.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1951


commit 098272d753f406754ad17f4ba3ecd6a08881d82c
Author: Ranjan Banerjee <raba...@microsoft.com>
Date:   2017-02-21T21:51:16Z

Implementing new eventhub driver




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---