[jira] [Assigned] (FLINK-4045) Test AMQP connector using Apache ActiveMQ emebdded broker

2016-06-13 Thread Subhankar Biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Subhankar Biswas reassigned FLINK-4045:
---

Assignee: Subhankar Biswas

> Test AMQP connector using Apache ActiveMQ emebdded broker
> -
>
> Key: FLINK-4045
> URL: https://issues.apache.org/jira/browse/FLINK-4045
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> Currently, our (RabbitMQ) AMQP client is not tested in any integration tests.
> Apache ActiveMQ implements an AMQP broker and they provide an embedded 
> implementation we can use for the integration tests: 
> http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4051) RabbitMQ Source might not react to cancel signal

2016-06-13 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327758#comment-15327758
 ] 

Subhankar Biswas commented on FLINK-4051:
-

OK i'll start working on it.

> RabbitMQ Source might not react to cancel signal
> 
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4051) RabbitMQ Source might not react to cancel signal

2016-06-12 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326810#comment-15326810
 ] 

Subhankar Biswas edited comment on FLINK-4051 at 6/13/16 4:43 AM:
--

nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) 
use poll() method. 
{code:java}
//Code of take
   lock.lock();
   try {
 E x;
 while ( (x = unlinkFirst()) == null)
 notEmpty.await();
 return x;
} finally {
lock.unlock();
}
{code}
{code:java}
//Code of Poll
 lock.lockInterruptibly();
 try {
   E x;
   while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
   }
 return x;
   } finally {
lock.unlock();
   }
{code}
So if the queue is empty the thread will be await mode. IMO a good 
implementation is to use the poll(timeout, unit).


was (Author: neo20iitkgp):
nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) 
use poll() method. 
{code:java}
//Code of take
   lock.lock();
   try {
 E x;
 while ( (x = unlinkFirst()) == null)
 notEmpty.await();
 return x;
} finally {
lock.unlock();
}
{code}
{code:java}
//Code of Poll
 lock.lockInterruptibly();
 try {
   E x;
   while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
   }
 return x;
   } finally {
lock.unlock();
   }
{code}
So if the queue is empty the thread will be await mode.

> RabbitMQ Source might not react to cancel signal
> 
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4051) RabbitMQ Source might not react to cancel signal

2016-06-12 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326810#comment-15326810
 ] 

Subhankar Biswas edited comment on FLINK-4051 at 6/13/16 4:42 AM:
--

nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) 
use poll() method. 
{code:java}
//Code of take
   lock.lock();
   try {
 E x;
 while ( (x = unlinkFirst()) == null)
 notEmpty.await();
 return x;
} finally {
lock.unlock();
}
{code}
{code:java}
//Code of Poll
 lock.lockInterruptibly();
 try {
   E x;
   while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
   }
 return x;
   } finally {
lock.unlock();
   }
{code}
So if the queue is empty the thread will be await mode.


was (Author: neo20iitkgp):
nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) 
use poll() method. 
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
{code}
{code:java}
//Code of Poll 
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
{code}
So if the queue is empty the thread will be await mode.

> RabbitMQ Source might not react to cancel signal
> 
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4051) RabbitMQ Source might not react to cancel signal

2016-06-12 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326810#comment-15326810
 ] 

Subhankar Biswas edited comment on FLINK-4051 at 6/13/16 4:31 AM:
--

nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) 
use poll() method. 
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
{code}
{code:java}
//Code of Poll 
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
{code}
So if the queue is empty the thread will be await mode.


was (Author: neo20iitkgp):
 {code:java}nextDelivery(){code} use take() method of BlockingQueue, while 
nextDelivery(timeout) use poll() method. 
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
{code}
{code:java}
//Code of Poll 
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
{code}
So if the queue is empty the thread will be await mode.

> RabbitMQ Source might not react to cancel signal
> 
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4051) RabbitMQ Source might not react to cancel signal

2016-06-12 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326810#comment-15326810
 ] 

Subhankar Biswas edited comment on FLINK-4051 at 6/13/16 4:30 AM:
--

 {code:java}nextDelivery(){code} use take() method of BlockingQueue, while 
nextDelivery(timeout) use poll() method. 
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
{code}
{code:java}
//Code of Poll 
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
{code}
So if the queue is empty the thread will be await mode.


was (Author: neo20iitkgp):
 nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) 
use poll() method. 
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
{code}
{code:java}
//Code of Poll 
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
{code}
So if the queue is empty the thread will be await mode.

> RabbitMQ Source might not react to cancel signal
> 
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4051) RabbitMQ Source might not react to cancel signal

2016-06-12 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326810#comment-15326810
 ] 

Subhankar Biswas commented on FLINK-4051:
-

 nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) 
use poll() method. 
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
{code}
{code:java}
//Code of Poll 
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
{code}
So if the queue is empty the thread will be await mode.

> RabbitMQ Source might not react to cancel signal
> 
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4051) RabbitMQ Source might not react to cancel signal

2016-06-09 Thread Subhankar Biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Subhankar Biswas reassigned FLINK-4051:
---

Assignee: Subhankar Biswas

> RabbitMQ Source might not react to cancel signal
> 
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2016-06-06 Thread Subhankar Biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Subhankar Biswas updated FLINK-3769:

Comment: was deleted

(was: IMO user should be able to build different configuration for queue in 
both sink and source side. So if we can able to build some configuration for 
queue like this connection configuration:
https://github.com/subhankarb/flink/blob/670e92d0731652563fd58631107e0f19c5d5d9a1/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
and then pass to sink and source and declare queue based on this config, user 
will have more options to pass configurations.
I will start work on this once https://issues.apache.org/jira/browse/FLINK-3763 
will get merged.)

> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>  Labels: rabbitmq
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2016-06-06 Thread Subhankar Biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Subhankar Biswas updated FLINK-3769:

Comment: was deleted

(was: IMO user should be able to build different configuration for queue in 
both sink and source side. So if we can able to build some configuration for 
queue like this connection configuration:
https://github.com/subhankarb/flink/blob/670e92d0731652563fd58631107e0f19c5d5d9a1/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
and then pass to sink and source and declare queue based on this config, user 
will have more options to pass configurations.
I will start work on this once https://issues.apache.org/jira/browse/FLINK-3763 
will get merged.)

> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>  Labels: rabbitmq
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2016-06-06 Thread Subhankar Biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Subhankar Biswas updated FLINK-3769:

Comment: was deleted

(was: IMO user should be able to build different configuration for queue in 
both sink and source side. So if we can able to build some configuration for 
queue like this connection configuration:
https://github.com/subhankarb/flink/blob/670e92d0731652563fd58631107e0f19c5d5d9a1/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
and then pass to sink and source and declare queue based on this config, user 
will have more options to pass configurations.
I will start work on this once FLINK-3763 will get merged. 
)

> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>  Labels: rabbitmq
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2016-06-06 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316252#comment-15316252
 ] 

Subhankar Biswas commented on FLINK-3769:
-

IMO user should be able to build different configuration for queue in both sink 
and source side. So if we can able to build some configuration for queue like 
this connection configuration:
https://github.com/subhankarb/flink/blob/670e92d0731652563fd58631107e0f19c5d5d9a1/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
and then pass to sink and source and declare queue based on this config, user 
will have more options to pass configurations.
I will start work on this once https://issues.apache.org/jira/browse/FLINK-3763 
will get merged.

> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>  Labels: rabbitmq
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2016-06-06 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316250#comment-15316250
 ] 

Subhankar Biswas commented on FLINK-3769:
-

IMO user should be able to build different configuration for queue in both sink 
and source side. So if we can able to build some configuration for queue like 
this connection configuration:
https://github.com/subhankarb/flink/blob/670e92d0731652563fd58631107e0f19c5d5d9a1/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
and then pass to sink and source and declare queue based on this config, user 
will have more options to pass configurations.
I will start work on this once https://issues.apache.org/jira/browse/FLINK-3763 
will get merged.

> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>  Labels: rabbitmq
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2016-06-06 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316242#comment-15316242
 ] 

Subhankar Biswas commented on FLINK-3769:
-

IMO user should be able to build different configuration for queue in both sink 
and source side. So if we can able to build some configuration for queue like 
this connection configuration:
https://github.com/subhankarb/flink/blob/670e92d0731652563fd58631107e0f19c5d5d9a1/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
and then pass to sink and source and declare queue based on this config, user 
will have more options to pass configurations.
I will start work on this once FLINK-3763 will get merged. 


> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>  Labels: rabbitmq
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2016-06-06 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316243#comment-15316243
 ] 

Subhankar Biswas commented on FLINK-3769:
-

IMO user should be able to build different configuration for queue in both sink 
and source side. So if we can able to build some configuration for queue like 
this connection configuration:
https://github.com/subhankarb/flink/blob/670e92d0731652563fd58631107e0f19c5d5d9a1/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
and then pass to sink and source and declare queue based on this config, user 
will have more options to pass configurations.
I will start work on this once FLINK-3763 will get merged. 


> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>  Labels: rabbitmq
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-30 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307250#comment-15307250
 ] 

Subhankar Biswas commented on FLINK-3763:
-

Hi [~rmetzger],
I am working on this. i'll raise the pull req today.

> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2016-04-15 Thread Subhankar Biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Subhankar Biswas reassigned FLINK-3769:
---

Assignee: Subhankar Biswas

> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>  Labels: rabbitmq
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-04-15 Thread Subhankar Biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Subhankar Biswas reassigned FLINK-3763:
---

Assignee: Subhankar Biswas

> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2016-04-15 Thread Subhankar Biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Subhankar Biswas updated FLINK-3769:

Assignee: (was: Subhankar Biswas)

> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>  Labels: rabbitmq
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3769) RabbitMQ Sink ability to publish to a different exchange

2016-04-15 Thread Subhankar Biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Subhankar Biswas reassigned FLINK-3769:
---

Assignee: Subhankar Biswas

> RabbitMQ Sink ability to publish to a different exchange
> 
>
> Key: FLINK-3769
> URL: https://issues.apache.org/jira/browse/FLINK-3769
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>  Labels: rabbitmq
>
> The RabbitMQ Sink can currently only publish to the "default" exchange. This 
> exchange is a direct exchange, so the routing key will route directly to the 
> queue name. Because of this, the current sink will only be 1-to-1-to-1 (1 job 
> to 1 exchange which routes to 1 queue). Additionally, I believe that if a 
> user decides to use a different exchange I think the following can be assumed:
> 1.) The provided exchange exists
> 2.) The user has declared the appropriate mapping and the appropriate queues 
> exist in RabbitMQ (therefore, nothing needs to be created)
> RabbitMQ currently provides four types of exchanges. Three of these will be 
> covered by just enabling exchanges (Direct, Fanout, Topic) because they use 
> the routingkey (or nothing). 
> The fourth exchange type relies on the message headers, which are currently 
> set to null by default on the publish. These headers may be on a per message 
> level, so the input of this stream will need to take this as input as well. 
> This forth exchange could very well be outside of the scope of this 
> Improvement and a "RabbitMQ Sink enable headers" Improvement might be the 
> better way to go with this.
> Exchange Types: https://www.rabbitmq.com/tutorials/amqp-concepts.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3034) Redis SInk Connector

2016-02-01 Thread subhankar biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

subhankar biswas reassigned FLINK-3034:
---

Assignee: subhankar biswas

> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: subhankar biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3033) Redis Source Connector

2016-02-01 Thread subhankar biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

subhankar biswas reassigned FLINK-3033:
---

Assignee: subhankar biswas

> Redis Source Connector
> --
>
> Key: FLINK-3033
> URL: https://issues.apache.org/jira/browse/FLINK-3033
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: subhankar biswas
>Priority: Minor
>
> Flink does not provide a source connector for Redis.
> See FLINK-3034



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-01-28 Thread subhankar biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15122946#comment-15122946
 ] 

subhankar biswas commented on FLINK-3034:
-

i already finished the sink.



> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3033) Redis Source Connector

2016-01-28 Thread subhankar biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15122945#comment-15122945
 ] 

subhankar biswas commented on FLINK-3033:
-

i already finished writing the source.



> Redis Source Connector
> --
>
> Key: FLINK-3033
> URL: https://issues.apache.org/jira/browse/FLINK-3033
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Flink does not provide a source connector for Redis.
> See FLINK-3034



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)