[GitHub] codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216551734 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java ## @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.nonpersistent; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.RedeliveryTracker; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class NonPersistentRedeliveryTracker implements RedeliveryTracker { Review comment: Ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216539139 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java ## @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.nonpersistent; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.RedeliveryTracker; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class NonPersistentRedeliveryTracker implements RedeliveryTracker { Review comment: Sure, so then maybe we can rename into `InMemoryRedeliveryTracker` to avoid confusion with this be associated with `NonPersistentTopic`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216539085 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java ## @@ -338,4 +340,9 @@ * @return consumer builder. */ ConsumerBuilder intercept(ConsumerInterceptor ...interceptors); + +/** + * Set dead letter policy for consumer Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216538878 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1184,6 +1239,40 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { MessageIdData.Builder builder = MessageIdData.newBuilder(); batches.forEach(ids -> { List messageIdDatas = ids.stream().map(messageId -> { +List> deadLetterMessages = null; Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216538381 ## File path: pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java ## @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +public class DeadLetterPolicy { + +private final int maxRedeliverCount; + +private String deadLetterTopic; + + +public DeadLetterPolicy(int maxRedeliverCount) { +this.maxRedeliverCount = maxRedeliverCount; +} + +public DeadLetterPolicy(int maxRedeliverCount, String deadLetterTopic) { Review comment: Ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216538312 ## File path: pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java ## @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; Review comment: Ok. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216538263 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java ## @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.nonpersistent; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.RedeliveryTracker; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class NonPersistentRedeliveryTracker implements RedeliveryTracker { Review comment: Because in the discuss of DLQ, has a solution is store the redelivery count in managed ledger, so i think it's possible to implement a PersistentRedeliveryTracker, so i named current implement to NonPersistentRedeliveryTracker. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #2550: [WIP] Fix status code of REST API to get list of subscriptions of partition…
merlimat closed pull request #2550: [WIP] Fix status code of REST API to get list of subscriptions of partition… URL: https://github.com/apache/incubator-pulsar/pull/2550 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index da51453f8a..9f5c2dfe83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -529,6 +529,12 @@ protected void internalDeleteTopic(boolean authoritative) { // subscriptions subscriptions.addAll(pulsar().getAdminClient().topics() .getSubscriptions(topicName.getPartition(0).toString())); +} catch (PulsarAdminException e) { +if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { +throw new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"); +} else { +throw new RestException(e); +} } catch (Exception e) { throw new RestException(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index c7c090fd44..187539fa90 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -798,6 +798,16 @@ public void partitionedTopics(String topicName) throws Exception { fail(e.getMessage()); } +try { +admin.topics().getSubscriptions(partitionedTopicName); +fail("should have failed"); +} catch (PulsarAdminException e) { +// ok +assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode()); +} catch (Exception e) { +fail(e.getMessage()); +} + // create consumer and subscription URL pulsarUrl = new URL("http://127.0.0.1; + ":" + BROKER_WEBSERVICE_PORT); PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, TimeUnit.SECONDS) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Fix status code of REST API to get list of subscriptions of partitioned topic (#2550)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 24605d3 Fix status code of REST API to get list of subscriptions of partitioned topic (#2550) 24605d3 is described below commit 24605d328357e89ae107b549a6f0da2ce1683857 Author: massakam AuthorDate: Tue Sep 11 12:43:21 2018 +0900 Fix status code of REST API to get list of subscriptions of partitioned topic (#2550) --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 6 ++ .../test/java/org/apache/pulsar/broker/admin/AdminApiTest.java | 10 ++ 2 files changed, 16 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index da51453..9f5c2df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -529,6 +529,12 @@ public class PersistentTopicsBase extends AdminResource { // subscriptions subscriptions.addAll(pulsar().getAdminClient().topics() .getSubscriptions(topicName.getPartition(0).toString())); +} catch (PulsarAdminException e) { +if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { +throw new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"); +} else { +throw new RestException(e); +} } catch (Exception e) { throw new RestException(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index c7c090f..187539f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -798,6 +798,16 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { fail(e.getMessage()); } +try { +admin.topics().getSubscriptions(partitionedTopicName); +fail("should have failed"); +} catch (PulsarAdminException e) { +// ok +assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode()); +} catch (Exception e) { +fail(e.getMessage()); +} + // create consumer and subscription URL pulsarUrl = new URL("http://127.0.0.1; + ":" + BROKER_WEBSERVICE_PORT); PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
[GitHub] merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216536439 ## File path: pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java ## @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +public class DeadLetterPolicy { + +private final int maxRedeliverCount; + +private String deadLetterTopic; + + +public DeadLetterPolicy(int maxRedeliverCount) { +this.maxRedeliverCount = maxRedeliverCount; +} + +public DeadLetterPolicy(int maxRedeliverCount, String deadLetterTopic) { Review comment: Instead of providing constructor overloads, we could use interface and builder approach like in other parts of the API. That way the code will long more descriptive. Eg: ```java /// This DeadLetterPolicy.builder() .maxRedeliverCount(10) .deadLetterTopic("my-dql-topic") .build() // Instead of new DeadLetterPolicy(10, "my-dql-topic") ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216534839 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java ## @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.nonpersistent; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.RedeliveryTracker; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class NonPersistentRedeliveryTracker implements RedeliveryTracker { Review comment: Why is this tracker called "non-persistent"? Is it because the counter itself is not stored? In that case it's a bit confusing since there's the concept of non-persistent topics as well, which I think is orthogonal to the redelivery tracker. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216537418 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ## @@ -265,7 +265,8 @@ public SendMessageInfo sendMessages(final List entries, SendListener list if (i == (entries.size() - 1)) { promise = writePromise; } -ctx.write(Commands.newMessage(consumerId, messageId, metadataAndPayload), promise); +int redeliveryCount = subscription.getDispatcher().getRedeliveryTracker().getRedeliveryCount(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())); Review comment: Instead of keeping a separate redelivery tracker, would it be possible to reuse the `pendingAcks` maps on the `Consumer` class? `pendingAcks` is a map for which the key is a pair longs and the value is a pair of longs as well. `(ledgerId, entryId) --> (batchSize, none)` As you can see, the second `long` field in the value is currently unused. We could keep the counter there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216536492 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java ## @@ -338,4 +340,9 @@ * @return consumer builder. */ ConsumerBuilder intercept(ConsumerInterceptor ...interceptors); + +/** + * Set dead letter policy for consumer Review comment: Please expand the comment, explaining what are the purpose and scope of dead-letter policy (for people who are not familiar with the concept). Also it would be nice to provide here a quick code example that explain how to enable this feature. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216536479 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1184,6 +1239,40 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { MessageIdData.Builder builder = MessageIdData.newBuilder(); batches.forEach(ids -> { List messageIdDatas = ids.stream().map(messageId -> { +List> deadLetterMessages = null; Review comment: Can this new portion of code be factored out in a separate method that is called from here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216536925 ## File path: pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java ## @@ -282,10 +282,11 @@ public static MessageMetadata parseMessageMetadata(ByteBuf buffer) { } } -public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, ByteBuf metadataAndPayload) { +public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, int redeliveryCount, ByteBuf metadataAndPayload) { CommandMessage.Builder msgBuilder = CommandMessage.newBuilder(); msgBuilder.setConsumerId(consumerId); msgBuilder.setMessageId(messageId); +msgBuilder.setRedeliveryCount(redeliveryCount); Review comment: if the `redeliveryCount` is 0, which is the default in the proto definition, and it's passed when the feature is disabled, we should avoid setting the value on the protobuf builder. If that is omitted, it would save 2 bytes per each message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216536431 ## File path: pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DeadLetterPolicy.java ## @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; Review comment: I'd prefer to keep this class in `pulsar-client` package instead of `pulsar-common`, under the `org.apache.pulsar.client.api` package. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic
merlimat commented on a change in pull request #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#discussion_r216534889 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -97,6 +100,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; this.messagesToReplay = new ConcurrentLongPairSet(512, 2); +this.redeliveryTracker = new NonPersistentRedeliveryTracker(); Review comment: See above, why `NonPersistentRedeliveryTracker` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2554: (WIP) [deployment] terraform scripts for deploying pulsar cluster to microsoft azure
rdhabalia commented on a change in pull request #2554: (WIP) [deployment] terraform scripts for deploying pulsar cluster to microsoft azure URL: https://github.com/apache/incubator-pulsar/pull/2554#discussion_r216526446 ## File path: deployment/terraform/packer/variables.json ## @@ -0,0 +1,11 @@ +{ + "pulsar_version": "2.1.0-incubating", Review comment: is it possible to not keeping it hard-coded? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2554: (WIP) [deployment] terraform scripts for deploying pulsar cluster to microsoft azure
sijie commented on a change in pull request #2554: (WIP) [deployment] terraform scripts for deploying pulsar cluster to microsoft azure URL: https://github.com/apache/incubator-pulsar/pull/2554#discussion_r216526415 ## File path: deployment/terraform/templates/pulsar.sh ## @@ -0,0 +1,3 @@ +#!/bin/bash + Review comment: @rdhabalia this is still wip :) I am placing a empty script and updating the init script one by one This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2554: (WIP) [deployment] terraform scripts for deploying pulsar cluster to microsoft azure
rdhabalia commented on a change in pull request #2554: (WIP) [deployment] terraform scripts for deploying pulsar cluster to microsoft azure URL: https://github.com/apache/incubator-pulsar/pull/2554#discussion_r216526325 ## File path: deployment/terraform/templates/pulsar.sh ## @@ -0,0 +1,3 @@ +#!/bin/bash + Review comment: this empty script is intentionally present? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2554: (WIP) [deployment] terraform scripts for deploying pulsar cluster to microsoft azure
rdhabalia commented on a change in pull request #2554: (WIP) [deployment] terraform scripts for deploying pulsar cluster to microsoft azure URL: https://github.com/apache/incubator-pulsar/pull/2554#discussion_r216526325 ## File path: deployment/terraform/templates/pulsar.sh ## @@ -0,0 +1,3 @@ +#!/bin/bash + Review comment: is it intentional to keep empty script here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new pull request #2554: (WIP) [deployment] terraform scripts for deploying pulsar cluster to microsoft azure
sijie opened a new pull request #2554: (WIP) [deployment] terraform scripts for deploying pulsar cluster to microsoft azure URL: https://github.com/apache/incubator-pulsar/pull/2554 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se commented on issue #2504: Add Presto Sql Test
aahmed-se commented on issue #2504: Add Presto Sql Test URL: https://github.com/apache/incubator-pulsar/pull/2504#issuecomment-420105342 This is ready for review @jerrypeng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2544: Readd MockBookKeeper to Pulsar
sijie closed pull request #2544: Readd MockBookKeeper to Pulsar URL: https://github.com/apache/incubator-pulsar/pull/2544 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java new file mode 100644 index 00..f3689e91e2 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; +import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; +import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; +import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.OpenBuilderBase; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mocked version of BookKeeper client that keeps all ledgers data in memory. + * + * This mocked client is meant to be used in unit tests for applications using the BookKeeper API. + */ +public class PulsarMockBookKeeper extends BookKeeper { + +final ExecutorService executor; +final ZooKeeper zkc; + +@Override +public ZooKeeper getZkHandle() { +return zkc; +} + +@Override +public ClientConfiguration getConf() { +return super.getConf(); +} + +Map ledgers = new ConcurrentHashMap<>(); +AtomicLong sequence = new AtomicLong(3); + +CompletableFuture defaultResponse = CompletableFuture.completedFuture(null); +List> failures = new ArrayList<>(); + +public PulsarMockBookKeeper(ZooKeeper zkc, ExecutorService executor) throws Exception { +this.zkc = zkc; +this.executor = executor; +} + +@Override +public LedgerHandle createLedger(DigestType digestType, byte passwd[]) +throws BKException, InterruptedException { +return createLedger(3, 2, digestType, passwd); +} + +@Override +public LedgerHandle createLedger(int ensSize, int qSize, DigestType digestType, byte passwd[]) +throws BKException, InterruptedException { +return createLedger(ensSize, qSize, qSize, digestType, passwd); +} + +@Override +public void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, final DigestType digestType, +final byte[] passwd, final CreateCallback cb, final Object ctx, Map properties) { +getProgrammedFailure().thenComposeAsync((res) -> { +try { +long id = sequence.getAndIncrement(); +log.info("Creating ledger {}", id); +PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd); +ledgers.put(id, lh); +return FutureUtils.value(lh); +} catch (Throwable t) { +return FutureUtils.exception(t); +} +}, executor).whenCompleteAsync((lh, exception) -> { +if (exception != null) { +cb.createComplete(getExceptionCode(exception), null, ctx); +} else { +
[incubator-pulsar] branch master updated: Readd MockBookKeeper to Pulsar (#2544)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 69571f8 Readd MockBookKeeper to Pulsar (#2544) 69571f8 is described below commit 69571f8ef67b99d66c7d405e17870100d0aa5080 Author: Ivan Kelly AuthorDate: Tue Sep 11 01:31:14 2018 +0200 Readd MockBookKeeper to Pulsar (#2544) These mocks were moved out of pulsar to the bookkeeper project a few months ago. While it is good to have mocks generally available for bookkeeper, if the mock is not in the pulsar code base and we want to change the behaviour of the mock for a specific case, we need to wait for a bookkeeper release cycle to do so. It's better to have the mock in Pulsar, so we can bend it to our needs. --- .../bookkeeper/client/PulsarMockBookKeeper.java| 284 + .../bookkeeper/client/PulsarMockLedgerHandle.java | 243 ++ .../bookkeeper/client/PulsarMockReadHandle.java| 125 + .../mledger/impl/ManagedLedgerErrorsTest.java | 4 +- .../bookkeeper/test/MockedBookKeeperTestCase.java | 6 +- .../broker/MockedBookKeeperClientFactory.java | 18 +- .../broker/auth/MockedPulsarServiceBaseTest.java | 31 ++- .../PersistentDispatcherFailoverConsumerTest.java | 3 +- .../pulsar/broker/service/PersistentTopicTest.java | 3 +- .../pulsar/broker/service/ServerCnxTest.java | 3 +- .../impl/BlobStoreManagedLedgerOffloaderTest.java | 6 +- 11 files changed, 704 insertions(+), 22 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java new file mode 100644 index 000..f3689e9 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; +import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; +import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; +import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.OpenBuilderBase; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mocked version of BookKeeper client that keeps all ledgers data in memory. + * + * This mocked client is meant to be used in unit tests for applications using the BookKeeper API. + */ +public class PulsarMockBookKeeper extends BookKeeper { + +final ExecutorService executor; +final ZooKeeper zkc; + +@Override +public ZooKeeper getZkHandle() { +return zkc; +} + +@Override +public ClientConfiguration getConf() { +return super.getConf(); +} + +Map ledgers = new ConcurrentHashMap<>(); +AtomicLong sequence = new AtomicLong(3); + +CompletableFuture defaultResponse = CompletableFuture.completedFuture(null); +List> failures = new ArrayList<>(); + +public PulsarMockBookKeeper(ZooKeeper zkc, ExecutorService executor) throws Exception { +this.zkc = zkc; +this.executor = executor; +} + +@Override +public LedgerHandle createLedger(DigestType digestType, byte passwd[]) +throws BKException, InterruptedException { +return createLedger(3, 2, digestType, passwd); +} + +@Override +
[GitHub] cckellogg opened a new pull request #2553: Remove property term from dashboard.
cckellogg opened a new pull request #2553: Remove property term from dashboard. URL: https://github.com/apache/incubator-pulsar/pull/2553 This updates the html to display the new v2 naming. ### Motivation This brings the dashboard up to date with the new v2 naming. Property/properties are now tenant/tenants. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Fix dashboard to work with python3. (#2552)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new fc2d1d3 Fix dashboard to work with python3. (#2552) fc2d1d3 is described below commit fc2d1d3da7ffb8bb47e30ec49c07cf8d9f735246 Author: cckellogg AuthorDate: Mon Sep 10 13:33:29 2018 -0700 Fix dashboard to work with python3. (#2552) ### Motivation Make dashboard work with python 3. --- dashboard/django/collector.py | 10 +- dashboard/django/stats/views.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dashboard/django/collector.py b/dashboard/django/collector.py index 76c0646..c8c3b13 100755 --- a/dashboard/django/collector.py +++ b/dashboard/django/collector.py @@ -63,7 +63,7 @@ def fetch_broker_stats(cluster, broker_url, timestamp): def _fetch_broker_stats(cluster, broker_host_port, timestamp): broker_url = 'http://%s/' % broker_host_port -print 'Getting stats for %s' % broker_host_port +print('Getting stats for %s' % broker_host_port) broker, _ = Broker.objects.get_or_create( url = broker_host_port, @@ -262,7 +262,7 @@ def fetch_stats(): if cluster_name == 'global': continue cluster_url = get(args.serviceUrl, '/admin/clusters/' + cluster_name)['serviceUrl'] -print 'Cluster:', cluster_name, '->', cluster_url +print('Cluster:', cluster_name, '->', cluster_url) cluster, created = Cluster.objects.get_or_create(name=cluster_name) if cluster_url != cluster.serviceUrl: cluster.serviceUrl = cluster_url @@ -275,7 +275,7 @@ def fetch_stats(): f = pool.apply_async(fetch_broker_stats, (cluster, broker_host_port, timestamp)) futures.append(f) except Exception as e: -print 'ERROR: ', e +print('ERROR: ', e) pool.close() @@ -300,10 +300,10 @@ def purge_db(): Consumer.objects.filter(timestamp__lt = threshold).delete() def collect_and_purge(): -print '-- Starting stats collection' +print('-- Starting stats collection') fetch_stats() purge_db() -print '-- Finished collecting stats' +print('-- Finished collecting stats') if __name__ == "__main__": os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dashboard.settings") diff --git a/dashboard/django/stats/views.py b/dashboard/django/stats/views.py index 426e9b1..9ab6859 100644 --- a/dashboard/django/stats/views.py +++ b/dashboard/django/stats/views.py @@ -56,7 +56,7 @@ def home(request): throughputOut = Sum('namespace__topic__msgThroughputOut'), ) -print properties.query +print(properties.query) properties = Table(request, properties, default_sort='name')
[GitHub] sijie closed pull request #2552: Fix dashboard to work with python3.
sijie closed pull request #2552: Fix dashboard to work with python3. URL: https://github.com/apache/incubator-pulsar/pull/2552 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/dashboard/django/collector.py b/dashboard/django/collector.py index 76c0646df9..c8c3b13cfc 100755 --- a/dashboard/django/collector.py +++ b/dashboard/django/collector.py @@ -63,7 +63,7 @@ def fetch_broker_stats(cluster, broker_url, timestamp): def _fetch_broker_stats(cluster, broker_host_port, timestamp): broker_url = 'http://%s/' % broker_host_port -print 'Getting stats for %s' % broker_host_port +print('Getting stats for %s' % broker_host_port) broker, _ = Broker.objects.get_or_create( url = broker_host_port, @@ -262,7 +262,7 @@ def fetch_stats(): if cluster_name == 'global': continue cluster_url = get(args.serviceUrl, '/admin/clusters/' + cluster_name)['serviceUrl'] -print 'Cluster:', cluster_name, '->', cluster_url +print('Cluster:', cluster_name, '->', cluster_url) cluster, created = Cluster.objects.get_or_create(name=cluster_name) if cluster_url != cluster.serviceUrl: cluster.serviceUrl = cluster_url @@ -275,7 +275,7 @@ def fetch_stats(): f = pool.apply_async(fetch_broker_stats, (cluster, broker_host_port, timestamp)) futures.append(f) except Exception as e: -print 'ERROR: ', e +print('ERROR: ', e) pool.close() @@ -300,10 +300,10 @@ def purge_db(): Consumer.objects.filter(timestamp__lt = threshold).delete() def collect_and_purge(): -print '-- Starting stats collection' +print('-- Starting stats collection') fetch_stats() purge_db() -print '-- Finished collecting stats' +print('-- Finished collecting stats') if __name__ == "__main__": os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dashboard.settings") diff --git a/dashboard/django/stats/views.py b/dashboard/django/stats/views.py index 426e9b1b6e..9ab6859d11 100644 --- a/dashboard/django/stats/views.py +++ b/dashboard/django/stats/views.py @@ -56,7 +56,7 @@ def home(request): throughputOut = Sum('namespace__topic__msgThroughputOut'), ) -print properties.query +print(properties.query) properties = Table(request, properties, default_sort='name') This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2546: Elastic connector
sijie closed pull request #2546: Elastic connector URL: https://github.com/apache/incubator-pulsar/pull/2546 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-io/elastic-search/pom.xml b/pulsar-io/elastic-search/pom.xml new file mode 100644 index 00..1339b2f311 --- /dev/null +++ b/pulsar-io/elastic-search/pom.xml @@ -0,0 +1,91 @@ + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + +org.apache.pulsar +pulsar-io +2.2.0-incubating-SNAPSHOT + + pulsar-io-elastic-search + Pulsar IO :: ElasticSearch + + + +jcenter +https://jcenter.bintray.com/ + + + + + + + ${project.groupId} + pulsar-io-core + ${project.version} + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + + + org.apache.commons + commons-lang3 + 3.4 + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + 6.3.2 + + + +net.andreinc.mockneat +mockneat +0.2.2 +test + + + + com.google.code.gson + gson + test + + + + + + +org.apache.nifi +nifi-nar-maven-plugin + + + + + \ No newline at end of file diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java new file mode 100644 index 00..3760d4072b --- /dev/null +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.elasticsearch; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; + +/** + * The base abstract class for ElasticSearch sinks. + * Users need to implement extractKeyValue function to use this sink. + * This class assumes that the input will be JSON documents + */ +public abstract class ElasticSearchAbstractSink implements Sink { + +protected static final String DOCUMENT = "doc"; + +private URL url; +private RestHighLevelClient client; +private CredentialsProvider credentialsProvider; +private ElasticSearchConfig elasticSearchConfig; + +@Override +public void open(Map config, SinkContext sinkContext) throws Exception { +elasticSearchConfig =
[incubator-pulsar] branch master updated: Elastic connector (#2546)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 928c3e1 Elastic connector (#2546) 928c3e1 is described below commit 928c3e1a9e1e1d27b3bf5bf816890d1316c4b5d2 Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com> AuthorDate: Mon Sep 10 13:31:49 2018 -0700 Elastic connector (#2546) ### Motivation Added a sink connector that writes JSON documents into ElasticSearch ### Modifications Added new pulsar-io module and associated integration tests ### Result An ElasticSearch sink connector will be available for use. --- pulsar-io/{hdfs => elastic-search}/pom.xml | 38 -- .../elasticsearch/ElasticSearchAbstractSink.java | 150 + .../io/elasticsearch/ElasticSearchConfig.java | 88 .../io/elasticsearch/ElasticSearchStringSink.java | 35 + .../pulsar/io/elasticsearch/package-info.java | 19 +++ .../resources/META-INF/services/pulsar-io.yaml | 22 +++ .../io/elasticsearch/ElasticSearchConfigTests.java | 125 + .../io/elasticsearch/ElasticSearchSinkTests.java | 144 .../pulsar/io/elasticsearch/data/Profile.java | 33 + .../pulsar/io/elasticsearch/data/UserProfile.java | 35 + .../src/test/resources/sinkConfig.yaml | 25 pulsar-io/hdfs/pom.xml | 1 + pulsar-io/pom.xml | 1 + tests/integration/pom.xml | 7 + .../containers/ElasticSearchContainer.java | 47 +++ .../integration/functions/PulsarFunctionsTest.java | 7 +- .../integration/io/ElasticSearchSinkTester.java| 75 +++ .../pulsar/tests/integration/io/SinkTester.java| 3 +- .../tests/integration/suites/PulsarTestSuite.java | 6 + 19 files changed, 851 insertions(+), 10 deletions(-) diff --git a/pulsar-io/hdfs/pom.xml b/pulsar-io/elastic-search/pom.xml similarity index 70% copy from pulsar-io/hdfs/pom.xml copy to pulsar-io/elastic-search/pom.xml index 0d55207..1339b2f 100644 --- a/pulsar-io/hdfs/pom.xml +++ b/pulsar-io/elastic-search/pom.xml @@ -25,15 +25,24 @@ pulsar-io 2.2.0-incubating-SNAPSHOT - pulsar-io-hdfs + pulsar-io-elastic-search + Pulsar IO :: ElasticSearch + + +jcenter +https://jcenter.bintray.com/ + + + - + + ${project.groupId} pulsar-io-core ${project.version} - + com.fasterxml.jackson.core jackson-databind @@ -44,15 +53,28 @@ jackson-dataformat-yaml + + org.apache.commons + commons-lang3 + 3.4 + + - org.apache.hadoop - hadoop-client - 3.1.1 + org.elasticsearch.client + elasticsearch-rest-high-level-client + 6.3.2 + + +net.andreinc.mockneat +mockneat +0.2.2 +test + - org.testng - testng + com.google.code.gson + gson test diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java new file mode 100644 index 000..3760d40 --- /dev/null +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.elasticsearch; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import
[GitHub] sijie closed pull request #2545: [docker] introduce a pulsar standalone image
sijie closed pull request #2545: [docker] introduce a pulsar standalone image URL: https://github.com/apache/incubator-pulsar/pull/2545 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docker/pom.xml b/docker/pom.xml index 675656a199..bdc99f7297 100644 --- a/docker/pom.xml +++ b/docker/pom.xml @@ -38,5 +38,6 @@ pulsar grafana pulsar-all +pulsar-standalone diff --git a/docker/pulsar-standalone/Dockerfile b/docker/pulsar-standalone/Dockerfile new file mode 100644 index 00..869b4c0fb2 --- /dev/null +++ b/docker/pulsar-standalone/Dockerfile @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +FROM apachepulsar/pulsar-all:latest + +RUN apt-get update +RUN apt-get -y install postgresql sudo nginx supervisor + +# Python dependencies +RUN pip install uwsgi 'Django<2.0' psycopg2 pytz requests + +# Postgres configuration +COPY conf/postgresql.conf /etc/postgresql/9.6/main/ + +# Configure nginx and supervisor +RUN echo "daemon off;" >> /etc/nginx/nginx.conf +COPY conf/nginx-app.conf /etc/nginx/sites-available/default +COPY conf/supervisor-app.conf /etc/supervisor/conf.d/ + +# Copy web-app sources +COPY . /pulsar/ + +# Setup database and create tables +RUN sudo -u postgres /etc/init.d/postgresql start && \ +sudo -u postgres psql --command "CREATE USER docker WITH PASSWORD 'docker';" && \ +sudo -u postgres createdb -O docker pulsar_dashboard && \ +cd /pulsar/django && \ +./manage.py migrate && \ +sudo -u postgres /etc/init.d/postgresql stop + +# Collect all static files needed by Django in a +# single place. Needed to run the app outside the +# Django test web server +RUN cd /pulsar/django && ./manage.py collectstatic --no-input + +ENV SERVICE_URL http://127.0.0.1:8080 +EXPOSE 80 + +CMD ["supervisord", "-n"] diff --git a/docker/pulsar-standalone/conf/nginx-app.conf b/docker/pulsar-standalone/conf/nginx-app.conf new file mode 100644 index 00..6f57c1019c --- /dev/null +++ b/docker/pulsar-standalone/conf/nginx-app.conf @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +upstream django { +server unix:/tmp/uwsgi.sock; +} + +server { +listen 80 default_server; + +charset utf-8; + +location /static { +alias /pulsar/django/static; +} + +location / { +uwsgi_pass django; +include /pulsar/conf/uwsgi_params; +} +} diff --git a/docker/pulsar-standalone/conf/postgresql.conf b/docker/pulsar-standalone/conf/postgresql.conf new file mode 100644 index 00..201dbba9da --- /dev/null +++ b/docker/pulsar-standalone/conf/postgresql.conf @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +#
[incubator-pulsar] branch master updated: [docker] introduce a pulsar standalone image (#2545)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2812fef [docker] introduce a pulsar standalone image (#2545) 2812fef is described below commit 2812fefb72aa8f7277a8a97d8842b0ad2dc6719b Author: Sijie Guo AuthorDate: Mon Sep 10 13:31:19 2018 -0700 [docker] introduce a pulsar standalone image (#2545) ## Motivation `pulsar` and `pulsar-all` are designed for running pulsar components on production. although it can be used for running standalone, people still need to run a separate docker container for pulsar dashboard. ## Changes introduce a `pulsar-standalone` image to package everything into one image. so people can run launch a pulsar standalone in one line command in docker, including dashboard. --- docker/pom.xml | 1 + docker/pulsar-standalone/Dockerfile| 55 docker/pulsar-standalone/conf/nginx-app.conf | 37 +++ docker/pulsar-standalone/conf/postgresql.conf | 38 +++ docker/pulsar-standalone/conf/supervisor-app.conf | 34 ++ docker/pulsar-standalone/conf/uwsgi.ini| 45 +++ docker/pulsar-standalone/conf/uwsgi_params | 16 + docker/pulsar-standalone/django/collector.py | 345 + docker/pulsar-standalone/django/collector.sh | 23 ++ .../pulsar-standalone/django/dashboard/__init__.py | 19 ++ .../pulsar-standalone/django/dashboard/settings.py | 181 +++ docker/pulsar-standalone/django/dashboard/urls.py | 45 +++ docker/pulsar-standalone/django/dashboard/wsgi.py | 35 +++ docker/pulsar-standalone/django/manage.py | 41 +++ docker/pulsar-standalone/django/stats/__init__.py | 19 ++ docker/pulsar-standalone/django/stats/admin.py | 34 ++ docker/pulsar-standalone/django/stats/apps.py | 26 ++ .../django/stats/migrations/0001_initial.py| 221 + .../django/stats/migrations/__init__.py| 20 ++ docker/pulsar-standalone/django/stats/models.py| 200 .../django/stats/templates/stats/base.html | 98 ++ .../django/stats/templates/stats/broker.html | 71 + .../django/stats/templates/stats/brokers.html | 90 ++ .../django/stats/templates/stats/clusters.html | 106 +++ .../django/stats/templates/stats/home.html | 74 + .../django/stats/templates/stats/namespace.html| 96 ++ .../django/stats/templates/stats/property.html | 72 + .../django/stats/templates/stats/topic.html| 197 .../django/stats/templates/stats/topics.html | 91 ++ .../django/stats/templatetags/__init__.py | 19 ++ .../django/stats/templatetags/stats_extras.py | 64 .../django/stats/templatetags/table.py | 91 ++ docker/pulsar-standalone/django/stats/tests.py | 22 ++ docker/pulsar-standalone/django/stats/urls.py | 37 +++ docker/pulsar-standalone/django/stats/views.py | 283 + docker/pulsar-standalone/pom.xml | 81 + site2/docs/getting-started-standalone.md | 27 +- .../getting-started-standalone.md | 14 +- 38 files changed, 2957 insertions(+), 11 deletions(-) diff --git a/docker/pom.xml b/docker/pom.xml index 675656a..bdc99f7 100644 --- a/docker/pom.xml +++ b/docker/pom.xml @@ -38,5 +38,6 @@ pulsar grafana pulsar-all +pulsar-standalone diff --git a/docker/pulsar-standalone/Dockerfile b/docker/pulsar-standalone/Dockerfile new file mode 100644 index 000..869b4c0 --- /dev/null +++ b/docker/pulsar-standalone/Dockerfile @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +FROM apachepulsar/pulsar-all:latest + +RUN apt-get update +RUN apt-get -y install postgresql sudo nginx supervisor + +# Python dependencies +RUN pip install uwsgi 'Django<2.0' psycopg2 pytz requests + +# Postgres configuration +COPY conf/postgresql.conf /etc/postgresql/9.6/main/ + +# Configure nginx and
[GitHub] ivankelly commented on issue #2544: Readd MockBookKeeper to Pulsar
ivankelly commented on issue #2544: Readd MockBookKeeper to Pulsar URL: https://github.com/apache/incubator-pulsar/pull/2544#issuecomment-420023164 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se commented on issue #2495: Trigger function not update function metrics correctly
aahmed-se commented on issue #2495: Trigger function not update function metrics correctly URL: https://github.com/apache/incubator-pulsar/issues/2495#issuecomment-420006976 This issue is resolves This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se closed issue #2495: Trigger function not update function metrics correctly
aahmed-se closed issue #2495: Trigger function not update function metrics correctly URL: https://github.com/apache/incubator-pulsar/issues/2495 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cckellogg opened a new pull request #2552: Fix dashboard to work with python3.
cckellogg opened a new pull request #2552: Fix dashboard to work with python3. URL: https://github.com/apache/incubator-pulsar/pull/2552 ### Motivation Make dashboard work with python 3. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2508: PIP-22: Dead Letter Topic
sijie commented on issue #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#issuecomment-420002665 ping @rdhabalia @merlimat This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2538: [io] jdbc connector uses Schema.AUTO
sijie commented on issue #2538: [io] jdbc connector uses Schema.AUTO URL: https://github.com/apache/incubator-pulsar/pull/2538#issuecomment-420001703 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2545: [docker] introduce a pulsar standalone image
sijie commented on issue #2545: [docker] introduce a pulsar standalone image URL: https://github.com/apache/incubator-pulsar/pull/2545#issuecomment-420001590 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC…
sijie commented on issue #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC… URL: https://github.com/apache/incubator-pulsar/pull/2543#issuecomment-42852 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on a change in pull request #2526: [dev] provide a python merge script for merging pull requests
maskit commented on a change in pull request #2526: [dev] provide a python merge script for merging pull requests URL: https://github.com/apache/incubator-pulsar/pull/2526#discussion_r216286359 ## File path: dev/pulsar-merge-pr.py ## @@ -0,0 +1,699 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Utility for creating well-formed pull request merges and pushing them to Apache. This script is a modified version +# of the one created by the BookKeeper project (https://github.com/apache/bookkeeper/blob/master/dev/bk-merge-pr.py). +# +# Usage: ./pulsar-merge-pr.py (see config env vars below) +# +# This utility assumes you already have local a pulsar git folder and that you +# have added remotes corresponding to the github apache pulsar repo. + +import json +import os +import re +import subprocess +import sys +import urllib2 + +PROJECT_NAME = "incubator-pulsar" + +CAPITALIZED_PROJECT_NAME = "pulsar".upper() +GITHUB_ISSUES_NAME = "issue".upper() + +# Location of the local git repository +REPO_HOME = os.environ.get("%s_HOME" % CAPITALIZED_PROJECT_NAME, os.getcwd()) +# Remote name which points to the GitHub site +PR_REMOTE_NAME = os.environ.get("PR_REMOTE_NAME", "apache") +# Remote name which points to Apache git +PUSH_REMOTE_NAME = os.environ.get("PUSH_REMOTE_NAME", "apache") +# Reference branch name +DEV_BRANCH_NAME = os.environ.get("DEV_BRANCH_NAME", "master") +# Github API page size +GITHUB_PAGE_SIZE = os.environ.get("GH_PAGE_SIZE", "100") +# OAuth key used for issuing requests against the GitHub API. If this is not defined, then requests +# will be unauthenticated. You should only need to configure this if you find yourself regularly +# exceeding your IP's unauthenticated request rate limit. You can create an OAuth key at +# https://github.com/settings/tokens. This script only requires the "public_repo" scope. +GITHUB_OAUTH_KEY = os.environ.get("GITHUB_OAUTH_KEY") + +GITHUB_USER = os.environ.get("GITHUB_USER", "apache") +GITHUB_BASE = "https://github.com/%s/%s/pull; % (GITHUB_USER, PROJECT_NAME) +GITHUB_API_URL = "https://api.github.com; +GITHUB_API_BASE = "%s/repos/%s/%s" % (GITHUB_API_URL, GITHUB_USER, PROJECT_NAME) +# Prefix added to temporary branches +TEMP_BRANCH_PREFIX = "PR_TOOL" +RELEASE_BRANCH_PREFIX = "branch-" + +DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.9.1.0") + +def get_json(url, preview_api = False): +try: +request = urllib2.Request(url) +if GITHUB_OAUTH_KEY: +request.add_header('Authorization', 'token %s' % GITHUB_OAUTH_KEY) +if preview_api: +request.add_header('Accept', 'application/vnd.github.black-cat-preview+json') +return json.load(urllib2.urlopen(request)) +except urllib2.HTTPError as e: +if "X-RateLimit-Remaining" in e.headers and e.headers["X-RateLimit-Remaining"] == '0': +print "Exceeded the GitHub API rate limit; see the instructions in " + \ + "pulsar-merge-pr.py to configure an OAuth token for making authenticated " + \ + "GitHub requests." +else: +print "Unable to fetch URL, exiting: %s" % url +sys.exit(-1) + +def post_json(url, data): +try: +request = urllib2.Request(url, data, { 'Content-Type': 'application/json' }) +if GITHUB_OAUTH_KEY: +request.add_header('Authorization', 'token %s' % GITHUB_OAUTH_KEY) +return json.load(urllib2.urlopen(request)) +except urllib2.HTTPError as e: +if "X-RateLimit-Remaining" in e.headers and e.headers["X-RateLimit-Remaining"] == '0': +print "Exceeded the GitHub API rate limit; see the instructions in " + \ + "pulsar-merge-pr.py to configure an OAuth token for making authenticated " + \ + "GitHub requests." +else: +print "Unable to fetch URL, exiting: %s - %s" % (url, e) +sys.exit(-1) + +def put_json(url, data): +try: +request = urllib2.Request(url, data, { 'Content-Type': 'application/json' }) +request.get_method = lambda: 'PUT' +if GITHUB_OAUTH_KEY: +request.add_header('Authorization', 'token %s' %
[GitHub] ivankelly commented on issue #2544: Readd MockBookKeeper to Pulsar
ivankelly commented on issue #2544: Readd MockBookKeeper to Pulsar URL: https://github.com/apache/incubator-pulsar/pull/2544#issuecomment-419873470 @maskit sure, will change it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #2544: Readd MockBookKeeper to Pulsar
ivankelly commented on issue #2544: Readd MockBookKeeper to Pulsar URL: https://github.com/apache/incubator-pulsar/pull/2544#issuecomment-419849442 rerun integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui removed a comment on issue #2551: Client switch connection between different pulsar cluster.
codelipenghui removed a comment on issue #2551: Client switch connection between different pulsar cluster. URL: https://github.com/apache/incubator-pulsar/issues/2551#issuecomment-419849208 #2543 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #2544: Readd MockBookKeeper to Pulsar
ivankelly commented on issue #2544: Readd MockBookKeeper to Pulsar URL: https://github.com/apache/incubator-pulsar/pull/2544#issuecomment-419849377 @maskit it does need to be in the same package because it's not a clean mock like we would have if just mocking an interface. It relies on some package private members of the classes it is overriding unfortunately. If we can move managed ledger to completely using the new bookkeeper apis, then the mocks can go in whatever package we want. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on issue #2551: Client switch connection between different pulsar cluster.
codelipenghui commented on issue #2551: Client switch connection between different pulsar cluster. URL: https://github.com/apache/incubator-pulsar/issues/2551#issuecomment-419849208 #2543 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui opened a new issue #2551: Client switch connection between different pulsar cluster.
codelipenghui opened a new issue #2551: Client switch connection between different pulsar cluster. URL: https://github.com/apache/incubator-pulsar/issues/2551 ## It's necessary to add a mechanism to support client switch connection between different pulsar cluster. ### Motivation Generally, we can use DNS or TCP proxy to improve broker availability when you have many pulsar cluster and one cluster is not available(such as network error). But in this solution, we need a backup cluster, the backup cluster has the same configurations. And in this solution have a problem with only can close connection by broker(shut down the broker or setting the network or offload namespace) to trigger client reconnect. So i have a idea, we can trigger client reconnect by pulsar client. ### Implement By now, we can create a pulsar client use service url. We can add a method named serviceUrlProvider to create a pulsar client. ```java public interface ServiceUrlProvider { /** * Get pulsar service url from ServiceUrlProvider. * * @return pulsar service url. */ String getServiceUrl(); /** * Set pulsar client to the provider for provider can control the pulsar client, * such as {@link PulsarClient#forceCloseConnection()} or {@link PulsarClient#close()}. * * @param client created pulsar client. */ void setClient(PulsarClient client); } ``` We can create custom provider by ServiceUrlProvider such as ZookeeperServiceUrlProvider. ZookeeperServiceUrlProvider can watch zookeeper node update event then trigger pulsar client to reconnect the new service url. We can add method like this: ```java public void onServiceUrlChanged(String newServiceUrl) throws PulsarClientException { this.getPulsarClient().getConf().setServiceUrl(newServiceUrl); this.getPulsarClient().reloadLookUp(); this.getPulsarClient().forceCloseConnection(); } ``` In this solution we switch client connection simpler and no need to do any thing in pulsar broker. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam opened a new pull request #2550: Fix status code of REST API to get list of subscriptions of partition…
massakam opened a new pull request #2550: Fix status code of REST API to get list of subscriptions of partition… URL: https://github.com/apache/incubator-pulsar/pull/2550 …ed topic Applied the same change as https://github.com/apache/incubator-pulsar/pull/2542 to the entry point for getting a list of subscriptions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia opened a new pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg
rdhabalia opened a new pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg URL: https://github.com/apache/incubator-pulsar/pull/2549 ### Motivation As discussed at #2438 and #2474, we would like to fix function assignment approach such a way that function-worker can support large number of function instances in the cluster. ### Modifications - using compacted topic for assignment - publish individual message for each new assignment ### Result Function-worker will be able to scale and performs well with large number of functions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #2535: Add ledger op timeout to avoid topics stuck on ledger-creation
rdhabalia commented on issue #2535: Add ledger op timeout to avoid topics stuck on ledger-creation URL: https://github.com/apache/incubator-pulsar/pull/2535#issuecomment-419817615 @merlimat can you please review it as we want to use this patch in our current deployed-release. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC…
codelipenghui commented on a change in pull request #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC… URL: https://github.com/apache/incubator-pulsar/pull/2543#discussion_r216212676 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java ## @@ -706,6 +706,58 @@ public void shutdown() throws PulsarClientException { } } +@Override +public void forceCloseConnection() { +if (this.producers != null) { Review comment: @merlimat I'm already fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services