[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
zhaijack commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185673143
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java
 ##
 @@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import 
org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+@Slf4j
+public class BlockAwareSegmentInputStreamTest {
+@Data
+class MockLedgerEntry implements LedgerEntry {
+public byte blockPadding = 0xB;
+long ledgerId;
+long entryId;
+long length;
+byte entryBytes[];
+ByteBuf entryBuffer;
+
+MockLedgerEntry(long ledgerId, long entryId, long length) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.length = length;
+this.entryBytes = new byte[(int)length];
+entryBuffer = Unpooled.wrappedBuffer(entryBytes);
+entryBuffer.writerIndex(0);
+IntStream.range(0, (int)length).forEach(i -> 
entryBuffer.writeByte(blockPadding));
+}
+
+@Override
+public ByteBuffer getEntryNioBuffer() {
+return null;
+}
+
+@Override
+public LedgerEntry duplicate() {
+return null;
+}
+
+@Override
+public void close() {
+entryBuffer.release();
+}
+}
+
+@Data
+class MockLedgerEntries implements LedgerEntries {
+int ledgerId;
+int startEntryId;
+int count;
+int entrySize;
+List entries;
+
+MockLedgerEntries(int ledgerId, int startEntryId, int count, int 
entrySize) {
+this.ledgerId = ledgerId;
+this.startEntryId = startEntryId;
+this.count = count;
+this.entrySize = entrySize;
+this.entries = Lists.newArrayList(count);
+
+IntStream.range(startEntryId, startEntryId + count).forEach(i ->
+entries.add(new MockLedgerEntry(ledgerId, i, entrySize)));
+}
+
+@Override
+public void close() {
+entries.clear();
+}
+
+@Override
+public LedgerEntry getEntry(long entryId) {
+if (entryId < startEntryId || entryId >= startEntryId + count) {
+return null;
+}
+
+return entries.get(((int)entryId - startEntryId));
+}
+
+@Override
+public Iterator iterator() {
+return entries.iterator();
+}
+}
+
+class MockReadHandle implements ReadHandle {
+int ledgerId;
+int entrySize;
+int lac;
+MockReadHandle(int ledgerId, int entrySize, int lac) {
+this.ledgerId = ledgerId;
+this.entrySize = entrySize;
+this.lac = lac;
+   

[GitHub] lucperkins commented on issue #1457: Schema registry documentation

2018-05-02 Thread GitBox
lucperkins commented on issue #1457: Schema registry documentation
URL: https://github.com/apache/incubator-pulsar/pull/1457#issuecomment-386153232
 
 
   @mgodave This is ready for review


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #1709: Bumped version to 2.1.0-incubating-SNAPSHOT

2018-05-02 Thread GitBox
merlimat closed pull request #1709: Bumped version to 2.1.0-incubating-SNAPSHOT
URL: https://github.com/apache/incubator-pulsar/pull/1709
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/all/pom.xml b/all/pom.xml
index 5251faec38..7ac9f54c2a 100644
--- a/all/pom.xml
+++ b/all/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index d83bb2c904..100a9c7a98 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -30,7 +30,7 @@
 
   org.apache.pulsar
   buildtools
-  2.0.0-incubating-SNAPSHOT
+  2.1.0-incubating-SNAPSHOT
   jar
   Pulsar Build Tools
 
diff --git a/docker/grafana/pom.xml b/docker/grafana/pom.xml
index 450eb827d5..0eaf5c8422 100644
--- a/docker/grafana/pom.xml
+++ b/docker/grafana/pom.xml
@@ -23,7 +23,7 @@
   
 org.apache.pulsar
 docker-images
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
   
   4.0.0
   org.apache.pulsar
diff --git a/docker/pom.xml b/docker/pom.xml
index ec4f8ca572..70f4480661 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
   
   org.apache.pulsar
   docker-images
diff --git a/docker/pulsar/pom.xml b/docker/pulsar/pom.xml
index 63c42ca3d8..3cd9fce76b 100644
--- a/docker/pulsar/pom.xml
+++ b/docker/pulsar/pom.xml
@@ -23,7 +23,7 @@
   
 org.apache.pulsar
 docker-images
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
   
   4.0.0
   org.apache.pulsar
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 3ee7703994..870ed6a057 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pom.xml b/pom.xml
index 761c710fd2..c18a320682 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
   org.apache.pulsar
   pulsar
 
-  2.0.0-incubating-SNAPSHOT
+  2.1.0-incubating-SNAPSHOT
 
   Pulsar
   Pulsar is a distributed pub-sub messaging platform with a very
diff --git a/pulsar-broker-auth-athenz/pom.xml 
b/pulsar-broker-auth-athenz/pom.xml
index 48e78e87ac..c163c10f4a 100644
--- a/pulsar-broker-auth-athenz/pom.xml
+++ b/pulsar-broker-auth-athenz/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
   
 
   pulsar-broker-auth-athenz
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index 75edadbe82..bb5a639840 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
   
 
   pulsar-broker-common
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index 5e77a95d5d..181d822084 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 2e1f22080a..0c9fe462f6 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-client-admin-shaded-for-functions/pom.xml 
b/pulsar-client-admin-shaded-for-functions/pom.xml
index 1e3833387f..444c4bfc52 100644
--- a/pulsar-client-admin-shaded-for-functions/pom.xml
+++ b/pulsar-client-admin-shaded-for-functions/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-client-admin-shaded/pom.xml 
b/pulsar-client-admin-shaded/pom.xml
index df5f867fa6..ba8da01196 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index e0665b1179..702a29e246 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-2.0.0-incubating-SNAPSHOT
+2.1.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-client-auth-athenz/pom.xml 
b/pulsar-client-auth-athenz/pom.xml
index e7df0fc482..5acce5f831 100644
--- a/pulsar-client-auth-athenz/pom.xml
+++ b/pulsar-client-auth-athenz/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-   

[incubator-pulsar] branch master updated: Bumped version to 2.1.0-incubating-SNAPSHOT (#1709)

2018-05-02 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new efc893b  Bumped version to 2.1.0-incubating-SNAPSHOT (#1709)
efc893b is described below

commit efc893b6997f6181e9cd785c9fd023d7632736ab
Author: Matteo Merli 
AuthorDate: Wed May 2 16:35:54 2018 -0700

Bumped version to 2.1.0-incubating-SNAPSHOT (#1709)
---
 all/pom.xml   | 2 +-
 buildtools/pom.xml| 2 +-
 docker/grafana/pom.xml| 2 +-
 docker/pom.xml| 2 +-
 docker/pulsar/pom.xml | 2 +-
 managed-ledger/pom.xml| 2 +-
 pom.xml   | 2 +-
 pulsar-broker-auth-athenz/pom.xml | 2 +-
 pulsar-broker-common/pom.xml  | 2 +-
 pulsar-broker-shaded/pom.xml  | 2 +-
 pulsar-broker/pom.xml | 2 +-
 pulsar-client-admin-shaded-for-functions/pom.xml  | 2 +-
 pulsar-client-admin-shaded/pom.xml| 2 +-
 pulsar-client-admin/pom.xml   | 2 +-
 pulsar-client-auth-athenz/pom.xml | 2 +-
 pulsar-client-kafka-compat/pom.xml| 2 +-
 pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml | 2 +-
 pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml  | 2 +-
 pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml| 2 +-
 pulsar-client-shaded/pom.xml  | 2 +-
 pulsar-client-tools-test/pom.xml  | 2 +-
 pulsar-client-tools/pom.xml   | 2 +-
 pulsar-client/pom.xml | 2 +-
 pulsar-common/pom.xml | 2 +-
 pulsar-connect/aerospike/pom.xml  | 2 +-
 pulsar-connect/cassandra/pom.xml  | 2 +-
 pulsar-connect/core/pom.xml   | 2 +-
 pulsar-connect/kafka/pom.xml  | 2 +-
 pulsar-connect/pom.xml| 2 +-
 pulsar-connect/rabbitmq/pom.xml   | 2 +-
 pulsar-connect/twitter/pom.xml| 2 +-
 pulsar-discovery-service/pom.xml  | 2 +-
 pulsar-functions/api-java/pom.xml | 2 +-
 pulsar-functions/instance/pom.xml | 2 +-
 pulsar-functions/java-examples/pom.xml| 2 +-
 pulsar-functions/metrics/pom.xml  | 2 +-
 pulsar-functions/pom.xml  | 2 +-
 pulsar-functions/proto-shaded/pom.xml | 2 +-
 pulsar-functions/proto/pom.xml| 2 +-
 pulsar-functions/runtime-all/pom.xml  | 2 +-
 pulsar-functions/runtime-shaded/pom.xml   | 2 +-
 pulsar-functions/runtime/pom.xml  | 2 +-
 pulsar-functions/utils/pom.xml| 2 +-
 pulsar-functions/worker-shaded/pom.xml| 2 +-
 pulsar-functions/worker/pom.xml   | 2 +-
 pulsar-log4j2-appender/pom.xml| 2 +-
 pulsar-proxy/pom.xml  | 2 +-
 pulsar-spark/pom.xml  | 2 +-
 pulsar-storm/pom.xml  | 2 +-
 pulsar-testclient/pom.xml | 2 +-
 pulsar-websocket/pom.xml  | 2 +-
 pulsar-zookeeper-utils/pom.xml| 2 +-
 pulsar-zookeeper/pom.xml  | 2 +-
 tests/docker-images/latest-version-image/pom.xml  | 2 +-
 tests/docker-images/pom.xml   | 2 +-
 tests/integration-tests-base/pom.xml  | 2 +-
 tests/integration-tests-topologies/pom.xml| 2 +-
 tests/integration-tests-utils/pom.xml | 2 +-
 tests/integration/cli/pom.xml | 2 +-
 tests/integration/compaction/pom.xml  | 2 +-
 tests/integration/pom.xml | 2 +-
 tests/integration/smoke/pom.xml   | 2 +-
 tests/pom.xml | 2 +-
 63 files changed, 63 insertions(+), 63 

[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy

2018-05-02 Thread GitBox
rdhabalia commented on a change in pull request #1707: Fixed authentication 
flow via Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185667124
 
 

 ##
 File path: 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 ##
 @@ -259,13 +295,34 @@ private boolean 
verifyAuthenticationIfNeeded(CommandConnect connect) {
 clientAuthRole = 
service.getAuthenticationService().authenticate(authenticationData, authMethod);
 LOG.info("[{}] Client successfully authenticated with {} role {}", 
remoteAddress, authMethod,
 clientAuthRole);
+if (service.getConfiguration().forwardAuthorizationCredentials()) {
+this.clientAuthData = authData;
+this.clientAuthMethod = authMethod;
+}
+createClient(clientConf, this.clientAuthData, 
this.clientAuthMethod);
 
 Review comment:
   as name suggests. can we create and return client in `createClient()`: 
`this.client = createClient(..)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy

2018-05-02 Thread GitBox
rdhabalia commented on a change in pull request #1707: Fixed authentication 
flow via Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185666025
 
 

 ##
 File path: 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
 ##
 @@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoopGroup;
+
+public class ProxyClientCnx extends ClientCnx {
+
+   String clientAuthRole;
+   String clientAuthData;
+   String clientAuthMethod;
+   
+   public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, String clientAuthRole,
+   String clientAuthData, String clientAuthMethod) {
+   super(conf, eventLoopGroup);
+   this.clientAuthRole = clientAuthRole;
+   this.clientAuthData = clientAuthData;
+   this.clientAuthMethod = clientAuthMethod;
+   }
+
+   @Override
+   protected ByteBuf newConnectCommand() throws PulsarClientException {
+   if (log.isDebugEnabled()) {
+log.info(
+"New Connection opened via ProxyClientCnx with params 
clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}",
+clientAuthRole, clientAuthData, clientAuthMethod);
+   }
+   String authData = "";
 
 Review comment:
   can we initialize with null?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy

2018-05-02 Thread GitBox
rdhabalia commented on a change in pull request #1707: Fixed authentication 
flow via Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185668132
 
 

 ##
 File path: 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
 ##
 @@ -65,15 +60,10 @@
 private ZooKeeperClientFactory zkClientFactory = null;
 
 private final EventLoopGroup acceptorGroup;
-private final EventLoopGroup workerGroup;
+final EventLoopGroup workerGroup;
 
 Review comment:
   can we keep it private?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy

2018-05-02 Thread GitBox
rdhabalia commented on a change in pull request #1707: Fixed authentication 
flow via Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185666469
 
 

 ##
 File path: 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 ##
 @@ -229,14 +245,39 @@ protected void handleLookup(CommandLookupTopic lookup) {
 private void close() {
 state = State.Closed;
 ctx.close();
+try {
+client.close();
+} catch (PulsarClientException e) {
+LOG.error("Unable to clode pulsar client - {}", client);
 
 Review comment:
   type `close ` and add `e.getMessage()` in log


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy

2018-05-02 Thread GitBox
rdhabalia commented on a change in pull request #1707: Fixed authentication 
flow via Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185667623
 
 

 ##
 File path: 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 ##
 @@ -259,13 +295,34 @@ private boolean 
verifyAuthenticationIfNeeded(CommandConnect connect) {
 clientAuthRole = 
service.getAuthenticationService().authenticate(authenticationData, authMethod);
 LOG.info("[{}] Client successfully authenticated with {} role {}", 
remoteAddress, authMethod,
 clientAuthRole);
+if (service.getConfiguration().forwardAuthorizationCredentials()) {
+this.clientAuthData = authData;
+this.clientAuthMethod = authMethod;
+}
+createClient(clientConf, this.clientAuthData, 
this.clientAuthMethod);
 
 Review comment:
   also we don't want to share proxy-to-broker connection when 
`service.getConfiguration().forwardAuthorizationCredentials()` is enabled. 
else, we can always share proxy-to-broker connection for lookup. 
   so, should we create a new client only in case of 
`service.getConfiguration().forwardAuthorizationCredentials()=true` else we can 
always share it.?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy

2018-05-02 Thread GitBox
rdhabalia commented on a change in pull request #1707: Fixed authentication 
flow via Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185668092
 
 

 ##
 File path: 
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 ##
 @@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+public class ProxyAuthenticationTest extends ProducerConsumerBase {
+   private static final Logger log = 
LoggerFactory.getLogger(ProxyAuthenticationTest.class);
+
+   public static class BasicAuthenticationData implements 
AuthenticationDataProvider {
+   private String authParam;
+
+   public BasicAuthenticationData(String authParam) {
+   this.authParam = authParam;
+   }
+
+   public boolean hasDataFromCommand() {
+   return true;
+   }
+
+   public String getCommandData() {
+   return authParam;
+   }
+
+   public boolean hasDataForHttp() {
+   return true;
+   }
+
+   @Override
+   public Set> getHttpHeaders() {
+   Map headers = new HashMap<>();
+   headers.put("BasicAuthentication", authParam);
+   return headers.entrySet();
+   }
+   }
+
+   public static class BasicAuthentication implements Authentication {
+
+   private String authParam;
+
+   @Override
+   public void close() throws IOException {
+   // noop
+   }
+
+   @Override
+   public String getAuthMethodName() {
+   return "BasicAuthentication";
+   }
+
+   @Override
+   public AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
+   try {
+   return new BasicAuthenticationData(authParam);
+   } catch (Exception e) {
+   throw new PulsarClientException(e);
+   }
+   }
+
+   @Override
+   public void configure(Map authParams) {
+   this.authParam = String.format("{\"entityType\": 
\"%s\", \"expiryTime\": \"%s\"}",
+   

[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy

2018-05-02 Thread GitBox
rdhabalia commented on a change in pull request #1707: Fixed authentication 
flow via Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185668018
 
 

 ##
 File path: 
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 ##
 @@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+public class ProxyAuthenticationTest extends ProducerConsumerBase {
+   private static final Logger log = 
LoggerFactory.getLogger(ProxyAuthenticationTest.class);
+
+   public static class BasicAuthenticationData implements 
AuthenticationDataProvider {
+   private String authParam;
+
+   public BasicAuthenticationData(String authParam) {
+   this.authParam = authParam;
+   }
+
+   public boolean hasDataFromCommand() {
+   return true;
+   }
+
+   public String getCommandData() {
+   return authParam;
+   }
+
+   public boolean hasDataForHttp() {
+   return true;
+   }
+
+   @Override
+   public Set> getHttpHeaders() {
+   Map headers = new HashMap<>();
+   headers.put("BasicAuthentication", authParam);
+   return headers.entrySet();
+   }
+   }
+
+   public static class BasicAuthentication implements Authentication {
+
+   private String authParam;
+
+   @Override
+   public void close() throws IOException {
+   // noop
+   }
+
+   @Override
+   public String getAuthMethodName() {
+   return "BasicAuthentication";
+   }
+
+   @Override
+   public AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
+   try {
+   return new BasicAuthenticationData(authParam);
+   } catch (Exception e) {
+   throw new PulsarClientException(e);
+   }
+   }
+
+   @Override
+   public void configure(Map authParams) {
+   this.authParam = String.format("{\"entityType\": 
\"%s\", \"expiryTime\": \"%s\"}",
+   

[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy

2018-05-02 Thread GitBox
rdhabalia commented on a change in pull request #1707: Fixed authentication 
flow via Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185665960
 
 

 ##
 File path: 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
 ##
 @@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoopGroup;
+
+public class ProxyClientCnx extends ClientCnx {
+
+   String clientAuthRole;
+   String clientAuthData;
+   String clientAuthMethod;
+   
+   public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, String clientAuthRole,
+   String clientAuthData, String clientAuthMethod) {
+   super(conf, eventLoopGroup);
+   this.clientAuthRole = clientAuthRole;
+   this.clientAuthData = clientAuthData;
+   this.clientAuthMethod = clientAuthMethod;
+   }
+
+   @Override
+   protected ByteBuf newConnectCommand() throws PulsarClientException {
+   if (log.isDebugEnabled()) {
+log.info(
 
 Review comment:
   `log.debug(..)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
zhaijack commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185665085
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
 ##
 @@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The BlockAwareSegmentInputStreamImpl for each cold storage data block.
+ * It gets data from ledger, and will be read out the content for a data block.
+ * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- 
long][entry_data]]) + padding
+ */
+public class BlockAwareSegmentInputStreamImpl extends InputStream implements 
BlockAwareSegmentInputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
+
+private static final byte[] BLOCK_END_PADDING = 
Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int ENTRIES_PER_READ = 100;
+// buf the entry size and entry id.
+private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* 
entry id */;
+// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long 
startEntryId, int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < blockSize);
+
+// once reach the end of entry buffer, start a new read.
+if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) 
{
 
 Review comment:
   OK. current code is try to release buffers earlier, and make each call of 
`readEntries` do less thing.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1709: Bumped version to 2.1.0-incubating-SNAPSHOT

2018-05-02 Thread GitBox
sijie commented on issue #1709: Bumped version to 2.1.0-incubating-SNAPSHOT
URL: https://github.com/apache/incubator-pulsar/pull/1709#issuecomment-386132512
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1719: Used shaded version of protobuf-2.4.1 internally, to coexist with components using proto3

2018-05-02 Thread GitBox
sijie commented on a change in pull request #1719: Used shaded version of 
protobuf-2.4.1 internally, to coexist with components using proto3
URL: https://github.com/apache/incubator-pulsar/pull/1719#discussion_r185649007
 
 

 ##
 File path: protobuf-shaded/pom.xml
 ##
 @@ -0,0 +1,76 @@
+
+
+

[incubator-pulsar] branch master updated: Version links in release notes (#1721)

2018-05-02 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 630e119  Version links in release notes (#1721)
630e119 is described below

commit 630e1190255a14b483a651a0d248a5a0f6ac40aa
Author: Luc Perkins 
AuthorDate: Wed May 2 14:44:26 2018 -0700

Version links in release notes (#1721)

This enables people to easily link to release notes for specific versions. 
In the future, this page should be templated and built from source files rather 
than raw Markdown, but for now this provides an easy stopgap solution.
---
 site/release-notes.md | 52 +--
 1 file changed, 26 insertions(+), 26 deletions(-)

diff --git a/site/release-notes.md b/site/release-notes.md
index 629851a..4e865b8 100644
--- a/site/release-notes.md
+++ b/site/release-notes.md
@@ -26,7 +26,7 @@ layout: content
 
 ## Apache incubator
 
-### 1.22.0-incubating  2018-03-06
+### 1.22.0-incubating  2018-03-06 
 
 This is the fourth of Apache Pulsar since entering the ASF incubator.
 
@@ -71,7 +71,7 @@ 
https://github.com/apache/incubator-pulsar/milestone/11?closed=1
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.22.0-incubating
 
-### 1.21.0-incubating  2017-12-17
+### 1.21.0-incubating  2017-12-17 
 
 This is the third of Apache Pulsar since entering the ASF incubator.
 
@@ -93,7 +93,7 @@ 
https://github.com/apache/incubator-pulsar/milestone/10?closed=1
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.21.0-incubating
 
-### 1.20.0-incubating  2017-08-08
+### 1.20.0-incubating  2017-08-08 
 
 This is the second of Apache Pulsar since entering the ASF incubator.
 
@@ -111,7 +111,7 @@ 
https://github.com/apache/incubator-pulsar/milestone/9?closed=1
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.20.0-incubating
 
-### 1.19.0-incubating  2017-08-08
+### 1.19.0-incubating  2017-08-08 
 
 This is the first of Apache Pulsar since entering the ASF incubator.
 
@@ -131,7 +131,7 @@ 
https://github.com/apache/incubator-pulsar/releases/tag/v1.19.0-incubating
 
 ## Pre-Apache
 
-### 1.18  2017-06-17
+### 1.18  2017-06-17 
 
 Main changes:
  * [#325](https://github.com/apache/incubator-pulsar/pull/325) Add Modular 
load manager documentation
@@ -172,7 +172,7 @@ Full list of changes: 
https://github.com/yahoo/pulsar/milestone/7?closed=1
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.18
 
-### 1.17.5  2017-05-02
+### 1.17.5  2017-05-02 
 
  * [#343](https://github.com/apache/incubator-pulsar/pull/343) Fix 
ModularLoadManager to select broker from current available-broker list
  * [#384](https://github.com/apache/incubator-pulsar/pull/384) Fix Send replay 
entries read callback from background thread, to avoid recursive stack calls
@@ -181,7 +181,7 @@ 
https://github.com/apache/incubator-pulsar/releases/tag/v1.18
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.5
 
-### 1.17.4  2017-04-25
+### 1.17.4  2017-04-25 
 
  * [#362](https://github.com/apache/incubator-pulsar/pull/362) Fix add timeout 
on blocking ZookeeperCache get call
  * [#375](https://github.com/apache/incubator-pulsar/pull/375) Fix possible 
deal lock on topic loading if broker fails to get MLConfiguration from zk
@@ -189,13 +189,13 @@ 
https://github.com/apache/incubator-pulsar/releases/tag/v1.17.5
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.4
 
-### 1.17.3  2017-04-20
+### 1.17.3  2017-04-20 
 
  * [#367](https://github.com/apache/incubator-pulsar/pull/367) Fix dispatcher 
correctly finds available consumer from list of shared-subscription consumers
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.3
 
-### 1.17.2  2017-04-06
+### 1.17.2  2017-04-06 
 
  * [#327](https://github.com/apache/incubator-pulsar/pull/327) Create znode 
for dynamic configuration if not present
  * [#336](https://github.com/apache/incubator-pulsar/pull/336) Fix prevent 
creation of topic when bundle is disable
@@ -203,13 +203,13 @@ 
https://github.com/apache/incubator-pulsar/releases/tag/v1.17.3
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.2
 
-### 1.17.1  2017-03-30
+### 1.17.1  2017-03-30 
 
  * [#326](https://github.com/apache/incubator-pulsar/pull/326) Fix memory leak 
while duplicating entry data from existing entry
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.1
 
-### 1.17  2017-03-30
+### 1.17  2017-03-30 
 
 Main changes:
 
@@ -239,13 +239,13 @@ Full list of changes: 
https://github.com/apache/incubator-pulsar/milestone/3?clo
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17
 
-### 1.16.5  2017-03-10
+### 1.16.5  2017-03-10 
 
  * [#311](https://github.com/apache/incubator-pulsar/pull/311) Exclude netty 
individual jars from binary distribution. This issue was causing binary 

[GitHub] sijie closed pull request #1720: Fix broken image

2018-05-02 Thread GitBox
sijie closed pull request #1720: Fix broken image
URL: https://github.com/apache/incubator-pulsar/pull/1720
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md 
b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
index 817b3eb6f0..2efabeaf8c 100644
--- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md
+++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
@@ -147,7 +147,7 @@ When the master consumer disconnects, all (non-acked and 
subsequent) messages wi
 
 In the diagram above, Consumer-C-1 is the master consumer while Consumer-C-2 
would be the next in line to receive messages if Consumer-C-2 disconnected.
 
-{% include figure.html src="/img/failover-subscriptions.png" alt="Shared 
subscriptions" width="80" %}
+{% include figure.html src="/img/failover-subscriptions.png" alt="Failover 
subscriptions" width="80" %}
 
 ### Multi-topic subscriptions
 
diff --git a/site/img/failover-subscriptions.png 
b/site/img/failover-subscriptions.png
new file mode 100644
index 00..2cf83fc1c5
Binary files /dev/null and b/site/img/failover-subscriptions.png differ


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Fix broken image (#1720)

2018-05-02 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new e9f8d9d  Fix broken image (#1720)
e9f8d9d is described below

commit e9f8d9d7b37721a3c27ffc5264663dbccfafcd75
Author: Luc Perkins 
AuthorDate: Wed May 2 14:44:54 2018 -0700

Fix broken image (#1720)

* add missing image

* update image

* one more small change to image
---
 .../latest/getting-started/ConceptsAndArchitecture.md   |   2 +-
 site/img/failover-subscriptions.png | Bin 0 -> 64604 bytes
 2 files changed, 1 insertion(+), 1 deletion(-)

diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md 
b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
index 817b3eb..2efabea 100644
--- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md
+++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
@@ -147,7 +147,7 @@ When the master consumer disconnects, all (non-acked and 
subsequent) messages wi
 
 In the diagram above, Consumer-C-1 is the master consumer while Consumer-C-2 
would be the next in line to receive messages if Consumer-C-2 disconnected.
 
-{% include figure.html src="/img/failover-subscriptions.png" alt="Shared 
subscriptions" width="80" %}
+{% include figure.html src="/img/failover-subscriptions.png" alt="Failover 
subscriptions" width="80" %}
 
 ### Multi-topic subscriptions
 
diff --git a/site/img/failover-subscriptions.png 
b/site/img/failover-subscriptions.png
new file mode 100644
index 000..2cf83fc
Binary files /dev/null and b/site/img/failover-subscriptions.png differ

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.


[GitHub] sijie closed pull request #1721: Version links in release notes

2018-05-02 Thread GitBox
sijie closed pull request #1721: Version links in release notes
URL: https://github.com/apache/incubator-pulsar/pull/1721
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/site/release-notes.md b/site/release-notes.md
index 629851a0b9..4e865b85cd 100644
--- a/site/release-notes.md
+++ b/site/release-notes.md
@@ -26,7 +26,7 @@ layout: content
 
 ## Apache incubator
 
-### 1.22.0-incubating  2018-03-06
+### 1.22.0-incubating  2018-03-06 
 
 This is the fourth of Apache Pulsar since entering the ASF incubator.
 
@@ -71,7 +71,7 @@ 
https://github.com/apache/incubator-pulsar/milestone/11?closed=1
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.22.0-incubating
 
-### 1.21.0-incubating  2017-12-17
+### 1.21.0-incubating  2017-12-17 
 
 This is the third of Apache Pulsar since entering the ASF incubator.
 
@@ -93,7 +93,7 @@ 
https://github.com/apache/incubator-pulsar/milestone/10?closed=1
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.21.0-incubating
 
-### 1.20.0-incubating  2017-08-08
+### 1.20.0-incubating  2017-08-08 
 
 This is the second of Apache Pulsar since entering the ASF incubator.
 
@@ -111,7 +111,7 @@ 
https://github.com/apache/incubator-pulsar/milestone/9?closed=1
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.20.0-incubating
 
-### 1.19.0-incubating  2017-08-08
+### 1.19.0-incubating  2017-08-08 
 
 This is the first of Apache Pulsar since entering the ASF incubator.
 
@@ -131,7 +131,7 @@ 
https://github.com/apache/incubator-pulsar/releases/tag/v1.19.0-incubating
 
 ## Pre-Apache
 
-### 1.18  2017-06-17
+### 1.18  2017-06-17 
 
 Main changes:
  * [#325](https://github.com/apache/incubator-pulsar/pull/325) Add Modular 
load manager documentation
@@ -172,7 +172,7 @@ Full list of changes: 
https://github.com/yahoo/pulsar/milestone/7?closed=1
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.18
 
-### 1.17.5  2017-05-02
+### 1.17.5  2017-05-02 
 
  * [#343](https://github.com/apache/incubator-pulsar/pull/343) Fix 
ModularLoadManager to select broker from current available-broker list
  * [#384](https://github.com/apache/incubator-pulsar/pull/384) Fix Send replay 
entries read callback from background thread, to avoid recursive stack calls
@@ -181,7 +181,7 @@ 
https://github.com/apache/incubator-pulsar/releases/tag/v1.18
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.5
 
-### 1.17.4  2017-04-25
+### 1.17.4  2017-04-25 
 
  * [#362](https://github.com/apache/incubator-pulsar/pull/362) Fix add timeout 
on blocking ZookeeperCache get call
  * [#375](https://github.com/apache/incubator-pulsar/pull/375) Fix possible 
deal lock on topic loading if broker fails to get MLConfiguration from zk
@@ -189,13 +189,13 @@ 
https://github.com/apache/incubator-pulsar/releases/tag/v1.17.5
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.4
 
-### 1.17.3  2017-04-20
+### 1.17.3  2017-04-20 
 
  * [#367](https://github.com/apache/incubator-pulsar/pull/367) Fix dispatcher 
correctly finds available consumer from list of shared-subscription consumers
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.3
 
-### 1.17.2  2017-04-06
+### 1.17.2  2017-04-06 
 
  * [#327](https://github.com/apache/incubator-pulsar/pull/327) Create znode 
for dynamic configuration if not present
  * [#336](https://github.com/apache/incubator-pulsar/pull/336) Fix prevent 
creation of topic when bundle is disable
@@ -203,13 +203,13 @@ 
https://github.com/apache/incubator-pulsar/releases/tag/v1.17.3
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.2
 
-### 1.17.1  2017-03-30
+### 1.17.1  2017-03-30 
 
  * [#326](https://github.com/apache/incubator-pulsar/pull/326) Fix memory leak 
while duplicating entry data from existing entry
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.1
 
-### 1.17  2017-03-30
+### 1.17  2017-03-30 
 
 Main changes:
 
@@ -239,13 +239,13 @@ Full list of changes: 
https://github.com/apache/incubator-pulsar/milestone/3?clo
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.17
 
-### 1.16.5  2017-03-10
+### 1.16.5  2017-03-10 
 
  * [#311](https://github.com/apache/incubator-pulsar/pull/311) Exclude netty 
individual jars from binary distribution. This issue was causing binary 
distribution to have conflicting netty dependencies.
 
 https://github.com/apache/incubator-pulsar/releases/tag/v1.16.5
 
-### 1.16.4  2017-03-10
+### 1.16.4  2017-03-10 
 
  * [#265](https://github.com/apache/incubator-pulsar/pull/265) Fix client 
closes http-connection on internal-server error
  * [#283](https://github.com/apache/incubator-pulsar/pull/283) Fix recycle 
keep alive command-object properly
@@ -253,19 +253,19 @@ 

[incubator-pulsar] branch master updated: Refactor functions to use Sink interface (#1708)

2018-05-02 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new c3c8013  Refactor functions to use Sink interface (#1708)
c3c8013 is described below

commit c3c8013d701d986970ec9baef142c338233f2f89
Author: Boyang Jerry Peng 
AuthorDate: Wed May 2 14:43:10 2018 -0700

Refactor functions to use Sink interface (#1708)

* use pulsar sink

* removing exception from interface

* addressing various comments

* changing MultiConsumersOneOuputTopicProducers to use String for partition 
id
---
 .../java/org/apache/pulsar/connect/core/Sink.java  |   2 +-
 .../functions/instance/JavaInstanceRunnable.java   | 212 --
 .../instance/processors/AtLeastOnceProcessor.java  |  77 ---
 .../instance/processors/AtMostOnceProcessor.java   |  79 ---
 .../processors/EffectivelyOnceProcessor.java   | 120 --
 .../instance/processors/MessageProcessor.java  | 107 -
 .../instance/processors/MessageProcessorBase.java  | 155 -
 .../MultiConsumersOneOuputTopicProducers.java  |  20 +-
 .../functions/instance/producers/Producers.java|   4 +-
 .../pulsar/functions/sink/DefaultRuntimeSink.java  |   1 -
 .../apache/pulsar/functions/sink/PulsarSink.java   | 247 -
 .../PulsarSinkConfig.java} |  19 +-
 .../apache/pulsar/functions/sink/RuntimeSink.java  |   5 +-
 .../pulsar/functions/source/PulsarSource.java  |  30 +--
 .../{PulsarConfig.java => PulsarSourceConfig.java} |   9 +-
 .../src/main/python/python_instance_main.py|  16 +-
 .../instance/JavaInstanceRunnableTest.java |  81 ---
 .../MultiConsumersOneOutputTopicProducersTest.java |   2 +-
 .../functions/sink/DefaultRuntimeSinkTest.java |   6 +-
 .../PulsarSinkTest.java}   | 136 +---
 .../pulsar/functions/source/PulsarSourceTest.java  |  90 +++-
 .../pulsar/functions/runtime/JavaInstanceMain.java |  40 ++--
 .../pulsar/functions/runtime/ProcessRuntime.java   |  35 ++-
 .../functions/runtime/ProcessRuntimeTest.java  |  20 +-
 24 files changed, 683 insertions(+), 830 deletions(-)

diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
index ca569e7..cd2d63d 100644
--- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
+++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
@@ -46,4 +46,4 @@ public interface Sink extends AutoCloseable {
  * @return Completable future fo async publish request
  */
 CompletableFuture write(T value);
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ca6414d..2e8037b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,15 +22,13 @@ package org.apache.pulsar.functions.instance;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
+import com.google.gson.Gson;
 import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
-import java.util.Base64;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -49,18 +47,23 @@ import org.apache.logging.log4j.ThreadContext;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.instance.processors.MessageProcessor;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import 

[GitHub] sijie closed pull request #1708: Refactor functions to use Sink interface

2018-05-02 Thread GitBox
sijie closed pull request #1708: Refactor functions to use Sink interface
URL: https://github.com/apache/incubator-pulsar/pull/1708
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
index ca569e794c..cd2d63d242 100644
--- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
+++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
@@ -46,4 +46,4 @@
  * @return Completable future fo async publish request
  */
 CompletableFuture write(T value);
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ca6414dc7a..2e8037b814 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,15 +22,13 @@
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
+import com.google.gson.Gson;
 import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
-import java.util.Base64;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -49,18 +47,23 @@
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.instance.processors.MessageProcessor;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.sink.PulsarSinkConfig;
+import org.apache.pulsar.functions.sink.RuntimeSink;
 import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.functions.source.PulsarSourceConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -93,18 +96,14 @@
 @Getter
 private Exception deathException;
 
-@Getter(AccessLevel.PACKAGE)
-private SerDe outputSerDe;
-
-@Getter(AccessLevel.PACKAGE)
-// processor
-private final MessageProcessor processor;
-
 // function stats
 private final FunctionStats stats;
 
 private Record currentRecord;
 
+private Source source;
+private RuntimeSink sink;
+
 public JavaInstanceRunnable(InstanceConfig instanceConfig,
 FunctionCacheManager fnCache,
 String jarFile,
@@ -116,9 +115,6 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
 this.client = (PulsarClientImpl) pulsarClient;
 this.stateStorageServiceUrl = stateStorageServiceUrl;
 this.stats = new FunctionStats();
-this.processor = MessageProcessor.create(
-client,
-instanceConfig.getFunctionDetails());
 }
 
 /**
@@ -151,19 +147,16 @@ JavaInstance setupJavaInstance() throws Exception {
 typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
 }
 
-// setup serde
-setupSerDe(typeArgs, clsLoader);
-
 // start the state table
 setupStateTable();
 // start the output producer
-processor.setupOutput(outputSerDe);
+setupOutput(typeArgs[1]);
 // start the input consumer
-   

[incubator-pulsar] annotated tag v1.22.1-incubating-candidate-1 updated (741174e -> 3151f61)

2018-05-02 Thread jai1
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a change to annotated tag v1.22.1-incubating-candidate-1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git.


*** WARNING: tag v1.22.1-incubating-candidate-1 was modified! ***

from 741174e  (commit)
  to 3151f61  (tag)
 tagging 741174e6b0a46592a46fe2944ef292f08bffa347 (commit)
 replaces v1.22.0-incubating
  by jai1
  on Wed May 2 13:30:43 2018 -0700

- Log -
Release v1.22.1-incubating-candidate-1
-BEGIN PGP SIGNATURE-

iQJEBAABCAAuFiEEDQCP4t9TLRC/fG0se6GmTLvBFOwFAlrqH/MQHGphaTFAYXBh
Y2hlLm9yZwAKCRB7oaZMu8EU7HnnEACnKg275Ih7aXXK5u6L6qbMj5ELicjWxB8e
EF4My0e7+H1KyRTez1PnMmCMel4w3twFUjSg4VMbyh9N9ts36Zz9kC4NfecP1xPI
NXR9QG1DxKwdK3OTfFnZQO4H7KDV0F9O20jNiw2QiVX8gitEf4cQPT3RX1Djcbfm
2/TlkZheaBQay/lDUayU4VTjnTG8aJZ5ZSEwy+pC1n6sgZXq0YN60afj5eDXet2o
tRuOSazGmxtBJKk6PVC7pCOgUEyYVxUEFcH8qn67P5Gdu4s4HUL3gxVrXSrR9Zd0
9ne49yReU4+XGhW+nIg0kenVvl5+Yezt+vD2yqvIFbQxgEZHKCjaVk9XjnCa42js
/592iryYDiwDulefkWhybH2bg1E17szCQiN3gLKAn51Bc90nY/V5Hy752U2S7IgE
W4zgDnA8LgMKR9Dr2i0fv+JlO59Ki2meBpDBjH09eYseidVc0a2Dh+xsoQvWIkc7
qn7QD9ElbMw6LO2R1n13g9Cq89kMIplr8T4N3WO4WxLa2ILgqZPAnABQPW42tIIZ
JDrLEzfvmF1WXM43mQV4a/3SWPS7VaP/mUCFh2n7bwX+KKaWiYO21M9IWvymghql
8J9iA7wXF0Q0pP7mF6JR9Cswtsl+ZlrGC46qXel9cUAPXbeSv6PaV3sj5RQQtpuC
c9s4uY2U/Q==
=Q5kY
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.


[incubator-pulsar] branch branch-1.22 updated: Release 1.22.1-incubating

2018-05-02 Thread jai1
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-1.22 by this push:
 new 741174e  Release 1.22.1-incubating
741174e is described below

commit 741174e6b0a46592a46fe2944ef292f08bffa347
Author: jai1 
AuthorDate: Wed May 2 13:29:23 2018 -0700

Release 1.22.1-incubating
---
 all/pom.xml  | 2 +-
 buildtools/pom.xml   | 2 +-
 managed-ledger/pom.xml   | 2 +-
 pom.xml  | 2 +-
 pulsar-broker-auth-athenz/pom.xml| 2 +-
 pulsar-broker-common/pom.xml | 2 +-
 pulsar-broker-shaded/pom.xml | 2 +-
 pulsar-broker/pom.xml| 2 +-
 pulsar-checksum/pom.xml  | 2 +-
 pulsar-client-admin-shaded/pom.xml   | 2 +-
 pulsar-client-admin/pom.xml  | 2 +-
 pulsar-client-auth-athenz/pom.xml| 2 +-
 pulsar-client-kafka-compat/pom.xml   | 2 +-
 pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml | 2 +-
 pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml   | 2 +-
 pulsar-client-shaded/pom.xml | 2 +-
 pulsar-client-tools/pom.xml  | 2 +-
 pulsar-client/pom.xml| 2 +-
 pulsar-common/pom.xml| 2 +-
 pulsar-discovery-service/pom.xml | 2 +-
 pulsar-proxy/pom.xml | 2 +-
 pulsar-spark/pom.xml | 2 +-
 pulsar-storm/pom.xml | 2 +-
 pulsar-testclient/pom.xml| 2 +-
 pulsar-websocket/pom.xml | 2 +-
 pulsar-zookeeper-utils/pom.xml   | 2 +-
 pulsar-zookeeper/pom.xml | 2 +-
 27 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/all/pom.xml b/all/pom.xml
index 4283f55..5f7ae07 100644
--- a/all/pom.xml
+++ b/all/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating
+1.22.1-incubating
 ..
   
 
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index 11a3bd9..2e42b9b 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating
+1.22.1-incubating
 ..
   
 
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 0cdff4a..f4bf035 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating
+1.22.1-incubating
 ..
   
 
diff --git a/pom.xml b/pom.xml
index daa6c5a..16b1eca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
   org.apache.pulsar
   pulsar
 
-  1.22.0-incubating
+  1.22.1-incubating
 
   Pulsar
   Pulsar is a distributed pub-sub messaging platform with a very
diff --git a/pulsar-broker-auth-athenz/pom.xml 
b/pulsar-broker-auth-athenz/pom.xml
index 0e5f6b1..3a24bd2 100644
--- a/pulsar-broker-auth-athenz/pom.xml
+++ b/pulsar-broker-auth-athenz/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating
+1.22.1-incubating
   
 
   pulsar-broker-auth-athenz
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index b65323f..940668a 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating
+1.22.1-incubating
   
 
   pulsar-broker-common
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index 6d552bf..efc43c5 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating
+1.22.1-incubating
 ..
   
 
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index f5c6528..868e1f4 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating
+1.22.1-incubating
 ..
   
 
diff --git a/pulsar-checksum/pom.xml b/pulsar-checksum/pom.xml
index ff0a63a..837286b 100644
--- a/pulsar-checksum/pom.xml
+++ b/pulsar-checksum/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating
+1.22.1-incubating
 ..
   
 
diff --git a/pulsar-client-admin-shaded/pom.xml 
b/pulsar-client-admin-shaded/pom.xml
index f72243f..c3c3928 100644
--- 

[GitHub] lucperkins opened a new pull request #1721: Version links in release notes

2018-05-02 Thread GitBox
lucperkins opened a new pull request #1721: Version links in release notes
URL: https://github.com/apache/incubator-pulsar/pull/1721
 
 
   This enables people to easily link to release notes for specific versions. 
In the future, this page should be templated and built from source files rather 
than raw Markdown, but for now this provides an easy stopgap solution.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jai1 closed pull request #1715: Broker should not start replicator for root partitioned-topic

2018-05-02 Thread GitBox
jai1 closed pull request #1715: Broker should not start replicator for root 
partitioned-topic
URL: https://github.com/apache/incubator-pulsar/pull/1715
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 1913dd5ff3..d5480ccc8e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,17 +18,22 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.DestinationName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,8 +62,9 @@
 Stopped, Starting, Started, Stopping
 }
 
-public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster,
-String remoteCluster, BrokerService brokerService) {
+public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster, String remoteCluster,
+BrokerService brokerService) throws NamingException {
+validatePartitionedTopic(topicName, brokerService);
 this.brokerService = brokerService;
 this.topicName = topicName;
 this.replicatorPrefix = replicatorPrefix;
@@ -67,7 +73,6 @@ public AbstractReplicator(String topicName, String 
replicatorPrefix, String loca
 this.client = (PulsarClientImpl) 
brokerService.getReplicationClient(remoteCluster);
 this.producer = null;
 this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
-
 this.producerConfiguration = new ProducerConfiguration();
 this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
 this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
@@ -214,5 +219,42 @@ public static String getReplicatorName(String 
replicatorPrefix, String cluster)
 return (replicatorPrefix + "." + cluster).intern();
 }
 
+/**
+ * Replication can't be started on root-partitioned-topic to avoid 
producer startup conflict.
+ * 
+ * 
+ * eg:
+ * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic 
with 2 partitions then
+ * broker explicitly creates replicator producer for: 
"my-topic-partition-1" and "my-topic-partition-2".
+ * 
+ * However, if broker tries to start producer with root topic "my-topic" 
then client-lib internally creates individual 
+ * producers for "my-topic-partition-1" and "my-topic-partition-2" which 
creates conflict with existing 
+ * replicator producers.
+ * 
+ * 
+ * Therefore, replicator can't be started on root-partition topic which 
can internally create multiple partitioned
+ * producers.
+ * 
+ * @param topicName
+ * @param brokerService
+ */
+private void validatePartitionedTopic(String topicName, BrokerService 
brokerService) throws NamingException {
+DestinationName destination = DestinationName.get(topicName);
+String partitionedTopicPath = 
path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
+destination.getNamespace().toString(), 
destination.getDomain().toString(),
+destination.getEncodedLocalName());
+boolean isPartitionedTopic = false;
+try {
+isPartitionedTopic = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+.get(partitionedTopicPath).isPresent();
+} catch (Exception e) {
+log.warn("Failed to verify partitioned topic {}-{}", topicName, 
e.getMessage());
+}
+if (isPartitionedTopic) {
+throw new NamingException(
+topicName + " is a partitioned-topic and replication can't 
be started for partitioned-producer ");
+}
+}
+
 private 

[incubator-pulsar] branch branch-1.22 updated: Broker should not start replicator for root partitioned-topic (#1715)

2018-05-02 Thread jai1
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-1.22 by this push:
 new ca7559b  Broker should not start replicator for root partitioned-topic 
(#1715)
ca7559b is described below

commit ca7559bd4e54eff3b3be8ba01263805da162bee0
Author: Jai Asher 
AuthorDate: Wed May 2 13:15:24 2018 -0700

Broker should not start replicator for root partitioned-topic (#1715)
---
 .../pulsar/broker/service/AbstractReplicator.java  | 48 ++-
 .../pulsar/broker/service/BrokerService.java   | 53 +---
 .../nonpersistent/NonPersistentReplicator.java |  3 +-
 .../service/nonpersistent/NonPersistentTopic.java  | 43 -
 .../service/persistent/PersistentReplicator.java   |  3 +-
 .../broker/service/persistent/PersistentTopic.java | 39 ++--
 .../pulsar/broker/service/ReplicatorTest.java  | 72 ++
 .../pulsar/broker/service/ReplicatorTestBase.java  |  3 +
 8 files changed, 216 insertions(+), 48 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 1913dd5..d5480cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,17 +18,22 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.DestinationName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,8 +62,9 @@ public abstract class AbstractReplicator {
 Stopped, Starting, Started, Stopping
 }
 
-public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster,
-String remoteCluster, BrokerService brokerService) {
+public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster, String remoteCluster,
+BrokerService brokerService) throws NamingException {
+validatePartitionedTopic(topicName, brokerService);
 this.brokerService = brokerService;
 this.topicName = topicName;
 this.replicatorPrefix = replicatorPrefix;
@@ -67,7 +73,6 @@ public abstract class AbstractReplicator {
 this.client = (PulsarClientImpl) 
brokerService.getReplicationClient(remoteCluster);
 this.producer = null;
 this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
-
 this.producerConfiguration = new ProducerConfiguration();
 this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
 this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
@@ -214,5 +219,42 @@ public abstract class AbstractReplicator {
 return (replicatorPrefix + "." + cluster).intern();
 }
 
+/**
+ * Replication can't be started on root-partitioned-topic to avoid 
producer startup conflict.
+ * 
+ * 
+ * eg:
+ * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic 
with 2 partitions then
+ * broker explicitly creates replicator producer for: 
"my-topic-partition-1" and "my-topic-partition-2".
+ * 
+ * However, if broker tries to start producer with root topic "my-topic" 
then client-lib internally creates individual 
+ * producers for "my-topic-partition-1" and "my-topic-partition-2" which 
creates conflict with existing 
+ * replicator producers.
+ * 
+ * 
+ * Therefore, replicator can't be started on root-partition topic which 
can internally create multiple partitioned
+ * producers.
+ * 
+ * @param topicName
+ * @param brokerService
+ */
+private void validatePartitionedTopic(String topicName, BrokerService 
brokerService) throws NamingException {
+DestinationName destination = DestinationName.get(topicName);
+String partitionedTopicPath = 
path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
+

[incubator-pulsar] branch branch-1.22 updated (1875841 -> 5484585)

2018-05-02 Thread jai1
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a change to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git.


from 1875841  Fix: handle invalid markDelete position at managed-cursor 
(#1718)
 add 5484585  Fix: deadlock while closing non-shared consumer (#1716)

No new revisions were added by this update.

Summary of changes:
 .../pulsar/broker/service/persistent/PersistentSubscription.java  | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.


[GitHub] jai1 closed pull request #1716: Fix: deadlock while closing non-shared consumer

2018-05-02 Thread GitBox
jai1 closed pull request #1716: Fix: deadlock while closing non-shared consumer
URL: https://github.com/apache/incubator-pulsar/pull/1716
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index e742f148cb..9ed7f74902 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -153,8 +153,12 @@ public synchronized void removeConsumer(Consumer consumer) 
throws BrokerServiceE
 if (!cursor.isDurable()) {
 // If cursor is not durable, we need to clean up the 
subscription as well
 close();
-topic.removeSubscription(subName);
-}
+
+// when topic closes: it iterates through 
concurrent-subscription map to close each subscription. so,
+// topic.remove again try to access same map which creates 
deadlock. so, execute it in different thread.
+topic.getBrokerService().pulsar().getExecutor().submit(() ->{
+topic.removeSubscription(subName);
+});}
 }
 
 // invalid consumer remove will throw an exception


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jai1 closed pull request #1718: Fix: handle invalid markDelete position at managed-cursor

2018-05-02 Thread GitBox
jai1 closed pull request #1718: Fix: handle invalid markDelete position at 
managed-cursor
URL: https://github.com/apache/incubator-pulsar/pull/1718
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 22cdf3d0f4..81727841b6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1291,7 +1291,7 @@ public void asyncMarkDelete(final Position position, 
Map propertie
 final MarkDeleteCallback callback, final Object ctx) {
 checkNotNull(position);
 checkArgument(position instanceof PositionImpl);
-
+
 if (STATE_UPDATER.get(this) == State.Closed) {
 callback.markDeleteFailed(new ManagedLedgerException("Cursor was 
already closed"), ctx);
 return;
@@ -1312,6 +1312,16 @@ public void asyncMarkDelete(final Position position, 
Map propertie
 log.debug("[{}] Mark delete cursor {} up to position: {}", 
ledger.getName(), name, position);
 }
 PositionImpl newPosition = (PositionImpl) position;
+
+if (((PositionImpl) 
ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
+if (log.isDebugEnabled()) {
+log.debug(
+"[{}] Failed mark delete due to invalid markDelete {} 
is ahead of last-confirmed-entry {} for cursor [{}]",
+ledger.getName(), position, 
ledger.getLastConfirmedEntry(), name);
+}
+callback.markDeleteFailed(new ManagedLedgerException("Invalid mark 
deleted position"), ctx);
+return;
+}
 
 lock.writeLock().lock();
 try {
@@ -1509,6 +1519,16 @@ public void asyncDelete(Position pos, final 
AsyncCallbacks.DeleteCallback callba
 ledger.getName(), name, pos, 
individualDeletedMessages, markDeletePosition, previousPosition);
 }
 
+if (((PositionImpl) 
ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
+if (log.isDebugEnabled()) {
+log.debug(
+"[{}] Failed mark delete due to invalid markDelete 
{} is ahead of last-confirmed-entry {} for cursor [{}]",
+ledger.getName(), position, 
ledger.getLastConfirmedEntry(), name);
+}
+callback.deleteFailed(new ManagedLedgerException("Invalid mark 
deleted position"), ctx);
+return;
+}
+
 if (individualDeletedMessages.contains(position) || 
position.compareTo(markDeletePosition) <= 0) {
 if (log.isDebugEnabled()) {
 log.debug("[{}] [{}] Position was already deleted {}", 
ledger.getName(), name, position);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 61e9b3da78..59e159456c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -49,6 +49,7 @@
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -2614,5 +2615,60 @@ public void operationFailed(MetaStoreException e) {
 assertEquals(entries.size(), totalAddEntries / 2);
 }
 
+@Test
+public void testInvalidMarkDelete() throws Exception {
+ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig());
+
+ManagedCursor cursor = ledger.openCursor("c1");
+Position readPosition = cursor.getReadPosition();
+Position markDeletePosition = cursor.getMarkDeletedPosition();
+
+List addedPositions = new ArrayList<>();
+for (int i = 0; i < 20; i++) {
+Position p = ledger.addEntry(("dummy-entry-" + 
i).getBytes(Encoding));
+addedPositions.add(p);
+}
+
+// validate: cursor.asyncMarkDelete(..)
+CountDownLatch markDeleteCallbackLatch = new CountDownLatch(1);
+  

[incubator-pulsar] branch branch-1.22 updated: Fix: handle invalid markDelete position at managed-cursor (#1718)

2018-05-02 Thread jai1
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-1.22 by this push:
 new 1875841  Fix: handle invalid markDelete position at managed-cursor 
(#1718)
1875841 is described below

commit 187584155bcacec0900e8f6203a5f81a1e60ec7d
Author: Jai Asher 
AuthorDate: Wed May 2 13:14:48 2018 -0700

Fix: handle invalid markDelete position at managed-cursor (#1718)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 -
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 56 ++
 2 files changed, 77 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 22cdf3d..8172784 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1291,7 +1291,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 final MarkDeleteCallback callback, final Object ctx) {
 checkNotNull(position);
 checkArgument(position instanceof PositionImpl);
-
+
 if (STATE_UPDATER.get(this) == State.Closed) {
 callback.markDeleteFailed(new ManagedLedgerException("Cursor was 
already closed"), ctx);
 return;
@@ -1312,6 +1312,16 @@ public class ManagedCursorImpl implements ManagedCursor {
 log.debug("[{}] Mark delete cursor {} up to position: {}", 
ledger.getName(), name, position);
 }
 PositionImpl newPosition = (PositionImpl) position;
+
+if (((PositionImpl) 
ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
+if (log.isDebugEnabled()) {
+log.debug(
+"[{}] Failed mark delete due to invalid markDelete {} 
is ahead of last-confirmed-entry {} for cursor [{}]",
+ledger.getName(), position, 
ledger.getLastConfirmedEntry(), name);
+}
+callback.markDeleteFailed(new ManagedLedgerException("Invalid mark 
deleted position"), ctx);
+return;
+}
 
 lock.writeLock().lock();
 try {
@@ -1509,6 +1519,16 @@ public class ManagedCursorImpl implements ManagedCursor {
 ledger.getName(), name, pos, 
individualDeletedMessages, markDeletePosition, previousPosition);
 }
 
+if (((PositionImpl) 
ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
+if (log.isDebugEnabled()) {
+log.debug(
+"[{}] Failed mark delete due to invalid markDelete 
{} is ahead of last-confirmed-entry {} for cursor [{}]",
+ledger.getName(), position, 
ledger.getLastConfirmedEntry(), name);
+}
+callback.deleteFailed(new ManagedLedgerException("Invalid mark 
deleted position"), ctx);
+return;
+}
+
 if (individualDeletedMessages.contains(position) || 
position.compareTo(markDeletePosition) <= 0) {
 if (log.isDebugEnabled()) {
 log.debug("[{}] [{}] Position was already deleted {}", 
ledger.getName(), name, position);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 61e9b3d..59e1594 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -2614,5 +2615,60 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 assertEquals(entries.size(), totalAddEntries / 2);
 }
 
+@Test
+public void testInvalidMarkDelete() throws Exception {
+ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig());
+
+ManagedCursor cursor = ledger.openCursor("c1");
+Position readPosition = cursor.getReadPosition();
+Position markDeletePosition = cursor.getMarkDeletedPosition();
+
+List 

[GitHub] lucperkins opened a new pull request #1720: Fix broken image

2018-05-02 Thread GitBox
lucperkins opened a new pull request #1720: Fix broken image
URL: https://github.com/apache/incubator-pulsar/pull/1720
 
 
   An image in the Concepts and Architecture doc wasn't properly carried over


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #1717: Fix website sidebar

2018-05-02 Thread GitBox
merlimat closed pull request #1717: Fix website sidebar
URL: https://github.com/apache/incubator-pulsar/pull/1717
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/site/Makefile b/site/Makefile
index 9fcd9d6a18..3694bdf6de 100644
--- a/site/Makefile
+++ b/site/Makefile
@@ -63,7 +63,7 @@ travis_build: javadoc
 travis_publish:
scripts/publish-website.sh
 
-build: api_docs
+build: api_docs swagger_docs_build
scripts/build-all-versions.sh
 
 serve: clean_local
@@ -86,7 +86,7 @@ swagger_definition_copy:
(cd $(shell git rev-parse --show-toplevel) && \
cp pulsar-broker/target/docs/swagger.json 
site/_data/admin-rest-api-swagger.json)
 
-swagger_docs_update: swagger_definition_gen swagger_definition_copy
+swagger_docs_build: swagger_definition_gen swagger_definition_copy
 
 protobuf_doc_gen:
scripts/protobuf-doc-gen.sh
@@ -96,4 +96,4 @@ protobuf_setup:
 
 api_docs: javadoc python_doc_gen cpp_doc_gen
 
-publish: deep_clean setup swagger_docs_update build
+publish: deep_clean setup build
diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml
index c145a18093..9f701bcc5c 100644
--- a/site/_data/sidebar.yaml
+++ b/site/_data/sidebar.yaml
@@ -82,7 +82,7 @@ groups:
 endpoint: Dashboard
   - title: Pulsar statistics
 endpoint: Stats
-  - Load distribution
+  - title: Load distribution
 endpoint: LoadDistribution
   - title: Pulsar proxy
 endpoint: Proxy


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Fix website sidebar (#1717)

2018-05-02 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 8056df5  Fix website sidebar (#1717)
8056df5 is described below

commit 8056df573514e2a34224ee0cfe7385a9fd179589
Author: Luc Perkins 
AuthorDate: Wed May 2 12:32:17 2018 -0700

Fix website sidebar (#1717)

* use proper YAML syntax

* add swagger docs build to make build command

* rename make commands
---
 site/Makefile   | 6 +++---
 site/_data/sidebar.yaml | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/site/Makefile b/site/Makefile
index 9fcd9d6..3694bdf 100644
--- a/site/Makefile
+++ b/site/Makefile
@@ -63,7 +63,7 @@ travis_build: javadoc
 travis_publish:
scripts/publish-website.sh
 
-build: api_docs
+build: api_docs swagger_docs_build
scripts/build-all-versions.sh
 
 serve: clean_local
@@ -86,7 +86,7 @@ swagger_definition_copy:
(cd $(shell git rev-parse --show-toplevel) && \
cp pulsar-broker/target/docs/swagger.json 
site/_data/admin-rest-api-swagger.json)
 
-swagger_docs_update: swagger_definition_gen swagger_definition_copy
+swagger_docs_build: swagger_definition_gen swagger_definition_copy
 
 protobuf_doc_gen:
scripts/protobuf-doc-gen.sh
@@ -96,4 +96,4 @@ protobuf_setup:
 
 api_docs: javadoc python_doc_gen cpp_doc_gen
 
-publish: deep_clean setup swagger_docs_update build
+publish: deep_clean setup build
diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml
index c145a18..9f701bc 100644
--- a/site/_data/sidebar.yaml
+++ b/site/_data/sidebar.yaml
@@ -82,7 +82,7 @@ groups:
 endpoint: Dashboard
   - title: Pulsar statistics
 endpoint: Stats
-  - Load distribution
+  - title: Load distribution
 endpoint: LoadDistribution
   - title: Pulsar proxy
 endpoint: Proxy

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.


[GitHub] jai1 opened a new pull request #1718: Fix: handle invalid markDelete position at managed-cursor

2018-05-02 Thread GitBox
jai1 opened a new pull request #1718: Fix: handle invalid markDelete position 
at managed-cursor
URL: https://github.com/apache/incubator-pulsar/pull/1718
 
 
   Backporting #1554 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1717: Fix website sidebar

2018-05-02 Thread GitBox
merlimat commented on issue #1717: Fix website sidebar
URL: https://github.com/apache/incubator-pulsar/pull/1717#issuecomment-386085307
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface

2018-05-02 Thread GitBox
jerrypeng commented on a change in pull request #1708: Refactor functions to 
use Sink interface
URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185603311
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
 ##
 @@ -29,7 +32,9 @@
  * There is a default implementation provided for wrapping up the user 
provided {@link Sink}. Pulsar sink
  * should be implemented using this interface to ensure supporting 
effective-once.
  */
-public interface RuntimeSink extends Sink {
+//public interface RuntimeSink extends Sink {
 
 Review comment:
   yup will remove


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface

2018-05-02 Thread GitBox
jerrypeng commented on a change in pull request #1708: Refactor functions to 
use Sink interface
URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185603124
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -18,5 +18,249 @@
  */
 package org.apache.pulsar.functions.sink;
 
-public class PulsarSink {
+import com.google.common.annotations.VisibleForTesting;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.instance.InstanceUtils;
+import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
+import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
+import org.apache.pulsar.functions.instance.producers.Producers;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+@Slf4j
+public class PulsarSink implements RuntimeSink {
+
+private PulsarClient client;
+private PulsarSinkConfig pulsarSinkConfig;
+private SerDe outputSerDe;
+
+private PulsarSinkProcessor pulsarSinkProcessor;
+
+private interface PulsarSinkProcessor {
+void initializeOutputProducer(String outputTopic) throws Exception;
+
+void sendOutputMessage(MessageBuilder outputMsgBuilder,
+   PulsarRecord pulsarRecord) throws Exception;
+
+void close() throws Exception;
+}
+
+private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor 
{
+private Producer producer;
+
+@Override
+public void initializeOutputProducer(String outputTopic) throws 
Exception {
+this.producer = AbstractOneOuputTopicProducers.createProducer(
+client, pulsarSinkConfig.getTopic());
+}
+
+@Override
+public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+  PulsarRecord pulsarRecord) throws 
Exception {
+Message outputMsg = outputMsgBuilder.build();
+this.producer.sendAsync(outputMsg);
+}
+
+@Override
+public void close() throws Exception {
+if (null != producer) {
+try {
+producer.close();
+} catch (PulsarClientException e) {
+log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+}
+}
+}
+}
+
+private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
+private Producer producer;
+
+@Override
+public void initializeOutputProducer(String outputTopic) throws 
Exception {
+this.producer = AbstractOneOuputTopicProducers.createProducer(
+client, pulsarSinkConfig.getTopic());
+}
+
+@Override
+public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+  PulsarRecord pulsarRecord) throws 
Exception {
+Message outputMsg = outputMsgBuilder.build();
+this.producer.sendAsync(outputMsg).thenAccept(messageId -> 
pulsarRecord.ack());
+}
+
+@Override
+public void close() throws Exception {
+if (null != producer) {
+try {
+producer.close();
+} catch (PulsarClientException e) {
+log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+}
+}
+}
+}
+
+private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor, ConsumerEventListener {
+
+@Getter(AccessLevel.PACKAGE)
+protected Producers outputProducer;
+
+@Override
+public void initializeOutputProducer(String outputTopic) throws 
Exception {
+outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic);
+outputProducer.initialize();
+}
+
+@Override
+public void sendOutputMessage(MessageBuilder outputMsgBuilder, 
PulsarRecord pulsarRecord)
+throws Exception {
+
+// 

[GitHub] jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface

2018-05-02 Thread GitBox
jerrypeng commented on a change in pull request #1708: Refactor functions to 
use Sink interface
URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185602904
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -18,5 +18,249 @@
  */
 package org.apache.pulsar.functions.sink;
 
-public class PulsarSink {
+import com.google.common.annotations.VisibleForTesting;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.instance.InstanceUtils;
+import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
+import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
+import org.apache.pulsar.functions.instance.producers.Producers;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+@Slf4j
+public class PulsarSink implements RuntimeSink {
+
+private PulsarClient client;
+private PulsarSinkConfig pulsarSinkConfig;
+private SerDe outputSerDe;
+
+private PulsarSinkProcessor pulsarSinkProcessor;
+
+private interface PulsarSinkProcessor {
+void initializeOutputProducer(String outputTopic) throws Exception;
+
+void sendOutputMessage(MessageBuilder outputMsgBuilder,
+   PulsarRecord pulsarRecord) throws Exception;
+
+void close() throws Exception;
+}
+
+private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor 
{
+private Producer producer;
+
+@Override
+public void initializeOutputProducer(String outputTopic) throws 
Exception {
+this.producer = AbstractOneOuputTopicProducers.createProducer(
+client, pulsarSinkConfig.getTopic());
+}
+
+@Override
+public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+  PulsarRecord pulsarRecord) throws 
Exception {
+Message outputMsg = outputMsgBuilder.build();
+this.producer.sendAsync(outputMsg);
+}
+
+@Override
+public void close() throws Exception {
+if (null != producer) {
+try {
+producer.close();
+} catch (PulsarClientException e) {
+log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+}
+}
+}
+}
+
+private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
 
 Review comment:
   this cannot be static. I am using the PulsarClient from the parent class
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface

2018-05-02 Thread GitBox
jerrypeng commented on a change in pull request #1708: Refactor functions to 
use Sink interface
URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185602874
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -18,5 +18,249 @@
  */
 package org.apache.pulsar.functions.sink;
 
-public class PulsarSink {
+import com.google.common.annotations.VisibleForTesting;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.instance.InstanceUtils;
+import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
+import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
+import org.apache.pulsar.functions.instance.producers.Producers;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+@Slf4j
+public class PulsarSink implements RuntimeSink {
+
+private PulsarClient client;
+private PulsarSinkConfig pulsarSinkConfig;
+private SerDe outputSerDe;
+
+private PulsarSinkProcessor pulsarSinkProcessor;
+
+private interface PulsarSinkProcessor {
+void initializeOutputProducer(String outputTopic) throws Exception;
+
+void sendOutputMessage(MessageBuilder outputMsgBuilder,
+   PulsarRecord pulsarRecord) throws Exception;
+
+void close() throws Exception;
+}
+
+private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor 
{
 
 Review comment:
   this cannot  be static.  I am using the PulsarClient from the parent class


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface

2018-05-02 Thread GitBox
jerrypeng commented on a change in pull request #1708: Refactor functions to 
use Sink interface
URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185602541
 
 

 ##
 File path: 
pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
 ##
 @@ -45,5 +45,5 @@
  * @param value output value
  * @return Completable future fo async publish request
  */
-CompletableFuture write(T value);
-}
\ No newline at end of file
+CompletableFuture write(T value) throws Exception;
 
 Review comment:
   I agree. I will change


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lucperkins commented on issue #1717: Fix website sidebar

2018-05-02 Thread GitBox
lucperkins commented on issue #1717: Fix website sidebar
URL: https://github.com/apache/incubator-pulsar/pull/1717#issuecomment-386073972
 
 
   @merlimat I made some additional changes that will fix the issue with the 
REST API docs not being generated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lucperkins opened a new pull request #1717: Fix website sidebar

2018-05-02 Thread GitBox
lucperkins opened a new pull request #1717: Fix website sidebar
URL: https://github.com/apache/incubator-pulsar/pull/1717
 
 
   The website build is now broken from a previous PR. This changes restores 
proper YAML syntax in the sidebar definition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185578889
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java
 ##
 @@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import 
org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+@Slf4j
+public class BlockAwareSegmentInputStreamTest {
+@Data
+class MockLedgerEntry implements LedgerEntry {
+public byte blockPadding = 0xB;
+long ledgerId;
+long entryId;
+long length;
+byte entryBytes[];
+ByteBuf entryBuffer;
+
+MockLedgerEntry(long ledgerId, long entryId, long length) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.length = length;
+this.entryBytes = new byte[(int)length];
+entryBuffer = Unpooled.wrappedBuffer(entryBytes);
+entryBuffer.writerIndex(0);
+IntStream.range(0, (int)length).forEach(i -> 
entryBuffer.writeByte(blockPadding));
+}
+
+@Override
+public ByteBuffer getEntryNioBuffer() {
+return null;
+}
+
+@Override
+public LedgerEntry duplicate() {
+return null;
+}
+
+@Override
+public void close() {
+entryBuffer.release();
+}
+}
+
+@Data
+class MockLedgerEntries implements LedgerEntries {
+int ledgerId;
+int startEntryId;
+int count;
+int entrySize;
+List entries;
+
+MockLedgerEntries(int ledgerId, int startEntryId, int count, int 
entrySize) {
+this.ledgerId = ledgerId;
+this.startEntryId = startEntryId;
+this.count = count;
+this.entrySize = entrySize;
+this.entries = Lists.newArrayList(count);
+
+IntStream.range(startEntryId, startEntryId + count).forEach(i ->
+entries.add(new MockLedgerEntry(ledgerId, i, entrySize)));
+}
+
+@Override
+public void close() {
+entries.clear();
+}
+
+@Override
+public LedgerEntry getEntry(long entryId) {
+if (entryId < startEntryId || entryId >= startEntryId + count) {
+return null;
+}
+
+return entries.get(((int)entryId - startEntryId));
+}
+
+@Override
+public Iterator iterator() {
+return entries.iterator();
+}
+}
+
+class MockReadHandle implements ReadHandle {
+int ledgerId;
+int entrySize;
+int lac;
+MockReadHandle(int ledgerId, int entrySize, int lac) {
+this.ledgerId = ledgerId;
+this.entrySize = entrySize;
+this.lac = lac;
+  

[incubator-pulsar] branch master updated: Add release notes page in website (#1705)

2018-05-02 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 4fff8d6  Add release notes page in website (#1705)
4fff8d6 is described below

commit 4fff8d6827544772ed38c0c6f5dcc09a3f7268e4
Author: Matteo Merli 
AuthorDate: Wed May 2 10:35:44 2018 -0700

Add release notes page in website (#1705)
---
 site/download.md  |   4 +-
 site/release-notes.md | 363 ++
 2 files changed, 365 insertions(+), 2 deletions(-)

diff --git a/site/download.md b/site/download.md
index 9aab82f..94d45e8 100644
--- a/site/download.md
+++ b/site/download.md
@@ -46,9 +46,9 @@ Pulsar's Release Managers. We also provide `MD5` and 
`SHA-512` checksums for eve
 After you download the file, you should calculate a checksum for your 
download, and make sure it is
 the same as ours.
 
-### Release notes for the {{ site.current_version }} release
+### Release notes
 
-[https://github.com/apache/incubator-pulsar/releases/tag/v{{site.current_version}}](https://github.com/apache/incubator-pulsar/releases/tag/v{{site.current_version}})
+[Release notes](../release-notes) for all Pulsar's versions
 
 ### Getting started
 
diff --git a/site/release-notes.md b/site/release-notes.md
new file mode 100644
index 000..629851a
--- /dev/null
+++ b/site/release-notes.md
@@ -0,0 +1,363 @@
+---
+title: Apache Pulsar Release Notes
+layout: content
+---
+
+
+
+## Apache incubator
+
+### 1.22.0-incubating  2018-03-06
+
+This is the fourth of Apache Pulsar since entering the ASF incubator.
+
+Major changes in this release include:
+
+ Features
+ * [#896](https://github.com/apache/incubator-pulsar/pull/896) PIP-7 Introduce 
Failure-domain and Anti-affinity-namespace group
+ * [#1031](https://github.com/apache/incubator-pulsar/pull/1031) Add optional 
key/value metadata to producers/consumers
+ * [#1129](https://github.com/apache/incubator-pulsar/pull/1129) Added end to 
end encryption in C++ client
+ * [#1151](https://github.com/apache/incubator-pulsar/pull/1151) Added REST 
handler to create a subscription on a topic
+ * [#1087](https://github.com/apache/incubator-pulsar/pull/1087) Add basic 
authentication plugin
+ * [#1200](https://github.com/apache/incubator-pulsar/pull/1200) Add pluggable 
authorization mechanism
+ * [#1208](https://github.com/apache/incubator-pulsar/pull/1208) Add 
hostname-verification at client tls connection
+ * [#950](https://github.com/apache/incubator-pulsar/pull/950) Provided an 
DCOS Universe package for pulsar
+ * [#1046](https://github.com/apache/incubator-pulsar/pull/1046) Introduce 
config to skip non-recoverable data-ledger
+ * [#899](https://github.com/apache/incubator-pulsar/pull/899) Add 
subscription auth mode by prefix
+ * [#1135](https://github.com/apache/incubator-pulsar/pull/1135) Added 
infinite time retention configuration option
+
+ Enhancements
+
+ * [#1094](https://github.com/apache/incubator-pulsar/pull/1094) Include 
BoringSSL native implementation for faster TLS
+ * [#1204](https://github.com/apache/incubator-pulsar/pull/1204) Reduce size 
of buffer used to assemble batches
+ * [#930](https://github.com/apache/incubator-pulsar/pull/930) Perform async 
DNS resolution
+ * [#1124](https://github.com/apache/incubator-pulsar/pull/1124) Support 
Pulsar proxy from C++/Python client library
+ * [#1012](https://github.com/apache/incubator-pulsar/pull/1012) Made load 
shedding for load manager Dynamically configurable
+ * [#962](https://github.com/apache/incubator-pulsar/pull/962) Raw Reader for 
Pulsar Topics
+ * [#941](https://github.com/apache/incubator-pulsar/pull/941) Upgraded 
Jackson version
+ * [#1002](https://github.com/apache/incubator-pulsar/pull/1002), 
[#1169](https://github.com/apache/incubator-pulsar/pull/1169), 
[#1168](https://github.com/apache/incubator-pulsar/pull/1168) Making Pulsar 
Proxy more secure
+ * [#1029](https://github.com/apache/incubator-pulsar/pull/1029) Fix 
MessageRouter hash inconsistent on C++/Java client
+
+ Fixes
+
+ * [#1153](https://github.com/apache/incubator-pulsar/pull/1153) Fixed 
increase partitions on a partitioned topic
+ * [#1195](https://github.com/apache/incubator-pulsar/pull/1195) Ensure the 
checksum is not stripped after validation in the broker
+ * [#1203](https://github.com/apache/incubator-pulsar/pull/1203) Use 
duplicates when writing from ByteBuf pair to avoid multiple threads issues
+ * [#1210](https://github.com/apache/incubator-pulsar/pull/1210) Cancel 
keep-alive timer task after the proxy switch to TCP proxy
+ * [#1170](https://github.com/apache/incubator-pulsar/pull/1170) Upgrade BK 
version: BK-4.3.1.91-yahoo (fix: stats + DoubleByteBuf)
+ * [#875](https://github.com/apache/incubator-pulsar/pull/875) Bug fixes for 
Websocket proxy
+
+The complete list of changes can be 

[GitHub] merlimat closed pull request #1705: Add release notes page in website

2018-05-02 Thread GitBox
merlimat closed pull request #1705: Add release notes page in website
URL: https://github.com/apache/incubator-pulsar/pull/1705
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/site/download.md b/site/download.md
index 9aab82fea0..94d45e837d 100644
--- a/site/download.md
+++ b/site/download.md
@@ -46,9 +46,9 @@ Pulsar's Release Managers. We also provide `MD5` and 
`SHA-512` checksums for eve
 After you download the file, you should calculate a checksum for your 
download, and make sure it is
 the same as ours.
 
-### Release notes for the {{ site.current_version }} release
+### Release notes
 
-[https://github.com/apache/incubator-pulsar/releases/tag/v{{site.current_version}}](https://github.com/apache/incubator-pulsar/releases/tag/v{{site.current_version}})
+[Release notes](../release-notes) for all Pulsar's versions
 
 ### Getting started
 
diff --git a/site/release-notes.md b/site/release-notes.md
new file mode 100644
index 00..629851a0b9
--- /dev/null
+++ b/site/release-notes.md
@@ -0,0 +1,363 @@
+---
+title: Apache Pulsar Release Notes
+layout: content
+---
+
+
+
+## Apache incubator
+
+### 1.22.0-incubating  2018-03-06
+
+This is the fourth of Apache Pulsar since entering the ASF incubator.
+
+Major changes in this release include:
+
+ Features
+ * [#896](https://github.com/apache/incubator-pulsar/pull/896) PIP-7 Introduce 
Failure-domain and Anti-affinity-namespace group
+ * [#1031](https://github.com/apache/incubator-pulsar/pull/1031) Add optional 
key/value metadata to producers/consumers
+ * [#1129](https://github.com/apache/incubator-pulsar/pull/1129) Added end to 
end encryption in C++ client
+ * [#1151](https://github.com/apache/incubator-pulsar/pull/1151) Added REST 
handler to create a subscription on a topic
+ * [#1087](https://github.com/apache/incubator-pulsar/pull/1087) Add basic 
authentication plugin
+ * [#1200](https://github.com/apache/incubator-pulsar/pull/1200) Add pluggable 
authorization mechanism
+ * [#1208](https://github.com/apache/incubator-pulsar/pull/1208) Add 
hostname-verification at client tls connection
+ * [#950](https://github.com/apache/incubator-pulsar/pull/950) Provided an 
DCOS Universe package for pulsar
+ * [#1046](https://github.com/apache/incubator-pulsar/pull/1046) Introduce 
config to skip non-recoverable data-ledger
+ * [#899](https://github.com/apache/incubator-pulsar/pull/899) Add 
subscription auth mode by prefix
+ * [#1135](https://github.com/apache/incubator-pulsar/pull/1135) Added 
infinite time retention configuration option
+
+ Enhancements
+
+ * [#1094](https://github.com/apache/incubator-pulsar/pull/1094) Include 
BoringSSL native implementation for faster TLS
+ * [#1204](https://github.com/apache/incubator-pulsar/pull/1204) Reduce size 
of buffer used to assemble batches
+ * [#930](https://github.com/apache/incubator-pulsar/pull/930) Perform async 
DNS resolution
+ * [#1124](https://github.com/apache/incubator-pulsar/pull/1124) Support 
Pulsar proxy from C++/Python client library
+ * [#1012](https://github.com/apache/incubator-pulsar/pull/1012) Made load 
shedding for load manager Dynamically configurable
+ * [#962](https://github.com/apache/incubator-pulsar/pull/962) Raw Reader for 
Pulsar Topics
+ * [#941](https://github.com/apache/incubator-pulsar/pull/941) Upgraded 
Jackson version
+ * [#1002](https://github.com/apache/incubator-pulsar/pull/1002), 
[#1169](https://github.com/apache/incubator-pulsar/pull/1169), 
[#1168](https://github.com/apache/incubator-pulsar/pull/1168) Making Pulsar 
Proxy more secure
+ * [#1029](https://github.com/apache/incubator-pulsar/pull/1029) Fix 
MessageRouter hash inconsistent on C++/Java client
+
+ Fixes
+
+ * [#1153](https://github.com/apache/incubator-pulsar/pull/1153) Fixed 
increase partitions on a partitioned topic
+ * [#1195](https://github.com/apache/incubator-pulsar/pull/1195) Ensure the 
checksum is not stripped after validation in the broker
+ * [#1203](https://github.com/apache/incubator-pulsar/pull/1203) Use 
duplicates when writing from ByteBuf pair to avoid multiple threads issues
+ * [#1210](https://github.com/apache/incubator-pulsar/pull/1210) Cancel 
keep-alive timer task after the proxy switch to TCP proxy
+ * [#1170](https://github.com/apache/incubator-pulsar/pull/1170) Upgrade BK 
version: BK-4.3.1.91-yahoo (fix: stats + DoubleByteBuf)
+ * [#875](https://github.com/apache/incubator-pulsar/pull/875) Bug fixes for 
Websocket proxy
+
+The complete list of changes can be found at:
+https://github.com/apache/incubator-pulsar/milestone/11?closed=1
+
+https://github.com/apache/incubator-pulsar/releases/tag/v1.22.0-incubating
+
+### 1.21.0-incubating  2017-12-17
+
+This is the third of Apache Pulsar since entering the ASF incubator.
+
+Major 

[GitHub] sijie closed pull request #1687: Don't offload empty ledgers

2018-05-02 Thread GitBox
sijie closed pull request #1687: Don't offload empty ledgers
URL: https://github.com/apache/incubator-pulsar/pull/1687
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f1d71007a4..1a606b7d4a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1966,7 +1966,8 @@ public void asyncOffloadPrefix(Position pos, 
OffloadCallback callback, Object ct
 long firstLedgerRetained = current;
 for (LedgerInfo ls : ledgers.headMap(current).values()) {
 if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
-if (!ls.getOffloadContext().getComplete()) {
+// don't offload if ledger has already been offloaded, or 
is empty
+if (!ls.getOffloadContext().getComplete() && ls.getSize() 
> 0) {
 ledgersToOffload.add(ls);
 }
 } else {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 278182cef3..76cbbb6de5 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -20,6 +20,7 @@
 
 import com.google.common.collect.ImmutableSet;
 
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -40,6 +41,7 @@
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -633,6 +635,47 @@ public void testOffloadDeleteIncomplete() throws Exception 
{
 assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedger));
 }
 
+@Test
+public void testDontOffloadEmpty() throws Exception {
+MockLedgerOffloader offloader = new MockLedgerOffloader();
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+config.setMaxEntriesPerLedger(10);
+config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+config.setRetentionTime(10, TimeUnit.MINUTES);
+config.setLedgerOffloader(offloader);
+ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+int i = 0;
+for (; i < 35; i++) {
+String content = "entry-" + i;
+ledger.addEntry(content.getBytes());
+}
+Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4);
+
+long firstLedgerId = 
ledger.getLedgersInfoAsList().get(0).getLedgerId();
+long secondLedgerId = 
ledger.getLedgersInfoAsList().get(1).getLedgerId();
+long thirdLedgerId = 
ledger.getLedgersInfoAsList().get(2).getLedgerId();
+long fourthLedgerId = 
ledger.getLedgersInfoAsList().get(3).getLedgerId();
+
+// make an ledger empty
+Field ledgersField = ledger.getClass().getDeclaredField("ledgers");
+ledgersField.setAccessible(true);
+Map ledgers = 
(Map)ledgersField.get(ledger);
+ledgers.put(secondLedgerId,
+
ledgers.get(secondLedgerId).toBuilder().setEntries(0).setSize(0).build());
+
+PositionImpl firstUnoffloaded = 
(PositionImpl)ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+Assert.assertEquals(firstUnoffloaded.getLedgerId(), fourthLedgerId);
+Assert.assertEquals(firstUnoffloaded.getEntryId(), 0);
+
+Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4);
+Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+.filter(e -> e.getOffloadContext().getComplete())
+.map(e -> 
e.getLedgerId()).collect(Collectors.toSet()),
+offloader.offloadedLedgers());
+Assert.assertEquals(offloader.offloadedLedgers(), 
ImmutableSet.of(firstLedgerId, thirdLedgerId));
+}
+
 static void assertEventuallyTrue(BooleanSupplier predicate) throws 
Exception {
 // wait up to 3 seconds
 for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {


 


[incubator-pulsar] branch master updated: Don't offload empty ledgers (#1687)

2018-05-02 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new b982943  Don't offload empty ledgers (#1687)
b982943 is described below

commit b982943c77972211c937270d305d14f9d12d5f84
Author: Ivan Kelly 
AuthorDate: Wed May 2 19:10:25 2018 +0200

Don't offload empty ledgers (#1687)

It shouldn't be possible for a ledger in a managed ledger to be
empty (it should be cleaned up on recovery), but this patch adds
defensive code so that if they do exist for some reason, they won't be
offloaded.

Master Issue: #1511
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 43 ++
 2 files changed, 45 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f1d7100..1a606b7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1966,7 +1966,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 long firstLedgerRetained = current;
 for (LedgerInfo ls : ledgers.headMap(current).values()) {
 if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
-if (!ls.getOffloadContext().getComplete()) {
+// don't offload if ledger has already been offloaded, or 
is empty
+if (!ls.getOffloadContext().getComplete() && ls.getSize() 
> 0) {
 ledgersToOffload.add(ls);
 }
 } else {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 278182c..76cbbb6 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl;
 
 import com.google.common.collect.ImmutableSet;
 
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -40,6 +41,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -633,6 +635,47 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
 assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedger));
 }
 
+@Test
+public void testDontOffloadEmpty() throws Exception {
+MockLedgerOffloader offloader = new MockLedgerOffloader();
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+config.setMaxEntriesPerLedger(10);
+config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+config.setRetentionTime(10, TimeUnit.MINUTES);
+config.setLedgerOffloader(offloader);
+ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+int i = 0;
+for (; i < 35; i++) {
+String content = "entry-" + i;
+ledger.addEntry(content.getBytes());
+}
+Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4);
+
+long firstLedgerId = 
ledger.getLedgersInfoAsList().get(0).getLedgerId();
+long secondLedgerId = 
ledger.getLedgersInfoAsList().get(1).getLedgerId();
+long thirdLedgerId = 
ledger.getLedgersInfoAsList().get(2).getLedgerId();
+long fourthLedgerId = 
ledger.getLedgersInfoAsList().get(3).getLedgerId();
+
+// make an ledger empty
+Field ledgersField = ledger.getClass().getDeclaredField("ledgers");
+ledgersField.setAccessible(true);
+Map ledgers = 
(Map)ledgersField.get(ledger);
+ledgers.put(secondLedgerId,
+
ledgers.get(secondLedgerId).toBuilder().setEntries(0).setSize(0).build());
+
+PositionImpl firstUnoffloaded = 
(PositionImpl)ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+Assert.assertEquals(firstUnoffloaded.getLedgerId(), fourthLedgerId);
+Assert.assertEquals(firstUnoffloaded.getEntryId(), 0);
+
+

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185564229
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
 ##
 @@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The BlockAwareSegmentInputStreamImpl for each cold storage data block.
+ * It gets data from ledger, and will be read out the content for a data block.
+ * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- 
long][entry_data]]) + padding
+ */
+public class BlockAwareSegmentInputStreamImpl extends InputStream implements 
BlockAwareSegmentInputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
+
+private static final byte[] BLOCK_END_PADDING = 
Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int ENTRIES_PER_READ = 100;
+// buf the entry size and entry id.
+private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* 
entry id */;
+// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long 
startEntryId, int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < blockSize);
+
+// once reach the end of entry buffer, start a new read.
+if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) 
{
 
 Review comment:
   it could be simplifier more if entriesByteBuf cleanup is left until close.
   
   ```
if (entriesByteBuf.isEmpty()) {
entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
   }
   if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + 
entriesByteBuf.get(0).readableBytes() > blockSize) {
  // not able to place a new Entry.
  dataBlockFullOffset = bytesReadOffset;
   
  return BLOCK_END_PADDING[(bytesReadOffset++ - dataBlockFullOffset) % 4];
   } else {
   ...
   ```


This is an automated message from the Apache Git 

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185561252
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
 ##
 @@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The BlockAwareSegmentInputStreamImpl for each cold storage data block.
+ * It gets data from ledger, and will be read out the content for a data block.
+ * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- 
long][entry_data]]) + padding
+ */
+public class BlockAwareSegmentInputStreamImpl extends InputStream implements 
BlockAwareSegmentInputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
+
+private static final byte[] BLOCK_END_PADDING = 
Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int ENTRIES_PER_READ = 100;
+// buf the entry size and entry id.
+private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* 
entry id */;
+// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long 
startEntryId, int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < blockSize);
+
+// once reach the end of entry buffer, start a new read.
+if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) 
{
+entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
+if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + 
entriesByteBuf.get(0).readableBytes() > blockSize) {
+// not able to place a new Entry.
+entriesByteBuf.forEach(buf -> buf.release());
+entriesByteBuf.clear();
+dataBlockFullOffset = bytesReadOffset;
+}
+}
+
+if (bytesReadOffset < dataBlockFullOffset) {
+// always read from the first ByteBuf in the list, once read all 
of its content remove it.
+ByteBuf entryByteBuf = 

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185566416
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java
 ##
 @@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import 
org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+@Slf4j
+public class BlockAwareSegmentInputStreamTest {
+@Data
+class MockLedgerEntry implements LedgerEntry {
+public byte blockPadding = 0xB;
+long ledgerId;
+long entryId;
+long length;
+byte entryBytes[];
+ByteBuf entryBuffer;
+
+MockLedgerEntry(long ledgerId, long entryId, long length) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.length = length;
+this.entryBytes = new byte[(int)length];
+entryBuffer = Unpooled.wrappedBuffer(entryBytes);
+entryBuffer.writerIndex(0);
+IntStream.range(0, (int)length).forEach(i -> 
entryBuffer.writeByte(blockPadding));
+}
+
+@Override
+public ByteBuffer getEntryNioBuffer() {
+return null;
+}
+
+@Override
+public LedgerEntry duplicate() {
+return null;
+}
+
+@Override
+public void close() {
+entryBuffer.release();
+}
+}
+
+@Data
+class MockLedgerEntries implements LedgerEntries {
+int ledgerId;
+int startEntryId;
+int count;
+int entrySize;
+List entries;
+
+MockLedgerEntries(int ledgerId, int startEntryId, int count, int 
entrySize) {
+this.ledgerId = ledgerId;
+this.startEntryId = startEntryId;
+this.count = count;
+this.entrySize = entrySize;
+this.entries = Lists.newArrayList(count);
+
+IntStream.range(startEntryId, startEntryId + count).forEach(i ->
+entries.add(new MockLedgerEntry(ledgerId, i, entrySize)));
+}
+
+@Override
+public void close() {
+entries.clear();
+}
+
+@Override
+public LedgerEntry getEntry(long entryId) {
+if (entryId < startEntryId || entryId >= startEntryId + count) {
+return null;
+}
+
+return entries.get(((int)entryId - startEntryId));
+}
+
+@Override
+public Iterator iterator() {
+return entries.iterator();
+}
+}
+
+class MockReadHandle implements ReadHandle {
+int ledgerId;
+int entrySize;
+int lac;
+MockReadHandle(int ledgerId, int entrySize, int lac) {
+this.ledgerId = ledgerId;
+this.entrySize = entrySize;
+this.lac = lac;
+  

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185568883
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java
 ##
 @@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import 
org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+@Slf4j
+public class BlockAwareSegmentInputStreamTest {
+@Data
+class MockLedgerEntry implements LedgerEntry {
+public byte blockPadding = 0xB;
+long ledgerId;
+long entryId;
+long length;
+byte entryBytes[];
+ByteBuf entryBuffer;
+
+MockLedgerEntry(long ledgerId, long entryId, long length) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.length = length;
+this.entryBytes = new byte[(int)length];
+entryBuffer = Unpooled.wrappedBuffer(entryBytes);
+entryBuffer.writerIndex(0);
+IntStream.range(0, (int)length).forEach(i -> 
entryBuffer.writeByte(blockPadding));
+}
+
+@Override
+public ByteBuffer getEntryNioBuffer() {
+return null;
+}
+
+@Override
+public LedgerEntry duplicate() {
+return null;
+}
+
+@Override
+public void close() {
+entryBuffer.release();
+}
+}
+
+@Data
+class MockLedgerEntries implements LedgerEntries {
+int ledgerId;
+int startEntryId;
+int count;
+int entrySize;
+List entries;
+
+MockLedgerEntries(int ledgerId, int startEntryId, int count, int 
entrySize) {
+this.ledgerId = ledgerId;
+this.startEntryId = startEntryId;
+this.count = count;
+this.entrySize = entrySize;
+this.entries = Lists.newArrayList(count);
+
+IntStream.range(startEntryId, startEntryId + count).forEach(i ->
+entries.add(new MockLedgerEntry(ledgerId, i, entrySize)));
+}
+
+@Override
+public void close() {
+entries.clear();
+}
+
+@Override
+public LedgerEntry getEntry(long entryId) {
+if (entryId < startEntryId || entryId >= startEntryId + count) {
+return null;
+}
+
+return entries.get(((int)entryId - startEntryId));
+}
+
+@Override
+public Iterator iterator() {
+return entries.iterator();
+}
+}
+
+class MockReadHandle implements ReadHandle {
+int ledgerId;
+int entrySize;
+int lac;
+MockReadHandle(int ledgerId, int entrySize, int lac) {
+this.ledgerId = ledgerId;
+this.entrySize = entrySize;
+this.lac = lac;
+  

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185560570
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
 ##
 @@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The BlockAwareSegmentInputStreamImpl for each cold storage data block.
+ * It gets data from ledger, and will be read out the content for a data block.
+ * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- 
long][entry_data]]) + padding
+ */
+public class BlockAwareSegmentInputStreamImpl extends InputStream implements 
BlockAwareSegmentInputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
+
+private static final byte[] BLOCK_END_PADDING = 
Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int ENTRIES_PER_READ = 100;
+// buf the entry size and entry id.
+private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* 
entry id */;
+// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long 
startEntryId, int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < blockSize);
+
+// once reach the end of entry buffer, start a new read.
+if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) 
{
+entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
+if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + 
entriesByteBuf.get(0).readableBytes() > blockSize) {
+// not able to place a new Entry.
+entriesByteBuf.forEach(buf -> buf.release());
+entriesByteBuf.clear();
+dataBlockFullOffset = bytesReadOffset;
+}
+}
+
+if (bytesReadOffset < dataBlockFullOffset) {
+// always read from the first ByteBuf in the list, once read all 
of its content remove it.
+ByteBuf entryByteBuf = 

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185557470
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
 ##
 @@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The BlockAwareSegmentInputStreamImpl for each cold storage data block.
+ * It gets data from ledger, and will be read out the content for a data block.
+ * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- 
long][entry_data]]) + padding
+ */
+public class BlockAwareSegmentInputStreamImpl extends InputStream implements 
BlockAwareSegmentInputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
+
+private static final byte[] BLOCK_END_PADDING = 
Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int ENTRIES_PER_READ = 100;
+// buf the entry size and entry id.
+private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* 
entry id */;
+// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long 
startEntryId, int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < blockSize);
+
+// once reach the end of entry buffer, start a new read.
+if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) 
{
+entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
+if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + 
entriesByteBuf.get(0).readableBytes() > blockSize) {
+// not able to place a new Entry.
+entriesByteBuf.forEach(buf -> buf.release());
+entriesByteBuf.clear();
+dataBlockFullOffset = bytesReadOffset;
+}
+}
+
+if (bytesReadOffset < dataBlockFullOffset) {
+// always read from the first ByteBuf in the list, once read all 
of its content remove it.
+ByteBuf entryByteBuf = 

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185562881
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
 ##
 @@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The BlockAwareSegmentInputStreamImpl for each cold storage data block.
+ * It gets data from ledger, and will be read out the content for a data block.
+ * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- 
long][entry_data]]) + padding
+ */
+public class BlockAwareSegmentInputStreamImpl extends InputStream implements 
BlockAwareSegmentInputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
+
+private static final byte[] BLOCK_END_PADDING = 
Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int ENTRIES_PER_READ = 100;
+// buf the entry size and entry id.
+private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* 
entry id */;
+// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long 
startEntryId, int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < blockSize);
+
+// once reach the end of entry buffer, start a new read.
+if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) 
{
 
 Review comment:
   moving the inner check out of the outer check will remove code duplication
   
   ```
if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) {
entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
   }
if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + 
entriesByteBuf.get(0).readableBytes() > blockSize) {
   // not able to place a new Entry.
  entriesByteBuf.forEach(buf -> buf.release());
  entriesByteBuf.clear();
  dataBlockFullOffset = bytesReadOffset;
   
  return BLOCK_END_PADDING[(bytesReadOffset++ - dataBlockFullOffset) % 4];
   } else if 

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185565049
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java
 ##
 @@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import 
org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+@Slf4j
+public class BlockAwareSegmentInputStreamTest {
+@Data
+class MockLedgerEntry implements LedgerEntry {
+public byte blockPadding = 0xB;
+long ledgerId;
+long entryId;
+long length;
+byte entryBytes[];
+ByteBuf entryBuffer;
+
+MockLedgerEntry(long ledgerId, long entryId, long length) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.length = length;
+this.entryBytes = new byte[(int)length];
+entryBuffer = Unpooled.wrappedBuffer(entryBytes);
+entryBuffer.writerIndex(0);
+IntStream.range(0, (int)length).forEach(i -> 
entryBuffer.writeByte(blockPadding));
+}
+
+@Override
+public ByteBuffer getEntryNioBuffer() {
+return null;
+}
+
+@Override
+public LedgerEntry duplicate() {
+return null;
+}
+
+@Override
+public void close() {
+entryBuffer.release();
+}
+}
+
+@Data
+class MockLedgerEntries implements LedgerEntries {
+int ledgerId;
+int startEntryId;
+int count;
+int entrySize;
+List entries;
+
+MockLedgerEntries(int ledgerId, int startEntryId, int count, int 
entrySize) {
+this.ledgerId = ledgerId;
+this.startEntryId = startEntryId;
+this.count = count;
+this.entrySize = entrySize;
+this.entries = Lists.newArrayList(count);
+
+IntStream.range(startEntryId, startEntryId + count).forEach(i ->
+entries.add(new MockLedgerEntry(ledgerId, i, entrySize)));
+}
+
+@Override
+public void close() {
+entries.clear();
+}
+
+@Override
+public LedgerEntry getEntry(long entryId) {
+if (entryId < startEntryId || entryId >= startEntryId + count) {
+return null;
+}
+
+return entries.get(((int)entryId - startEntryId));
+}
+
+@Override
+public Iterator iterator() {
+return entries.iterator();
+}
+}
+
+class MockReadHandle implements ReadHandle {
+int ledgerId;
+int entrySize;
+int lac;
+MockReadHandle(int ledgerId, int entrySize, int lac) {
+this.ledgerId = ledgerId;
+this.entrySize = entrySize;
+this.lac = lac;
+  

[GitHub] sijie commented on issue #1669: PIP-17: provide DataBlockHeader and implementation

2018-05-02 Thread GitBox
sijie commented on issue #1669: PIP-17: provide DataBlockHeader and 
implementation
URL: https://github.com/apache/incubator-pulsar/pull/1669#issuecomment-386049447
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: PIP-17: the part of index block for offload. (#1593)

2018-05-02 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 1404944  PIP-17:  the part of index block for offload. (#1593)
1404944 is described below

commit 14049448e5d4170b7ad90fefb8568eb265f278e7
Author: Jia Zhai 
AuthorDate: Thu May 3 01:03:02 2018 +0800

PIP-17:  the part of index block for offload. (#1593)

* add offload index block

* change follow ivan's comments

* change to use api.LedgerMetadata

* change following @sijie's comments

* avoid using org.apache.bookkeeper.client.LedgerMetadata serialization

* change following @ivan's comments
---
 .../pulsar/broker/s3offload/OffloadIndexBlock.java |  63 
 .../broker/s3offload/OffloadIndexBlockBuilder.java |  72 +
 .../pulsar/broker/s3offload/OffloadIndexEntry.java |  50 +++
 .../impl/OffloadIndexBlockBuilderImpl.java |  77 +
 .../s3offload/impl/OffloadIndexBlockImpl.java  | 337 +
 .../s3offload/impl/OffloadIndexEntryImpl.java  |  58 
 .../apache/pulsar/s3offload/OffloadIndexTest.java  | 237 +++
 7 files changed, 894 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
new file mode 100644
index 000..8f9d3ce
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ *
+ * The Index block abstraction used for offload a ledger to long term storage.
+ *
+ */
+@Unstable
+public interface OffloadIndexBlock extends Closeable {
+
+/**
+ * Get the content of the index block as InputStream.
+ * Read out in format:
+ *   | index_magic_header | index_block_len | index_entry_count |
+ *   |segment_metadata_length | segment metadata | index entries |
+ */
+InputStream toStream() throws IOException;
+
+/**
+ * Get the related OffloadIndexEntry that contains the given 
messageEntryId.
+ *
+ * @param messageEntryId
+ *  the entry id of message
+ * @return the offload index entry
+ */
+OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws 
IOException;
+
+/**
+ * Get the entry count that contained in this index Block.
+ */
+int getEntryCount();
+
+/**
+ * Get LedgerMetadata.
+ */
+LedgerMetadata getLedgerMetadata();
+
+}
+
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
new file mode 100644
index 000..8ec0395
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */

[GitHub] sijie closed pull request #1593: PIP-17: the part of index block for offload.

2018-05-02 Thread GitBox
sijie closed pull request #1593: PIP-17:  the part of index block for offload.
URL: https://github.com/apache/incubator-pulsar/pull/1593
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
new file mode 100644
index 00..8f9d3cecb1
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ *
+ * The Index block abstraction used for offload a ledger to long term storage.
+ *
+ */
+@Unstable
+public interface OffloadIndexBlock extends Closeable {
+
+/**
+ * Get the content of the index block as InputStream.
+ * Read out in format:
+ *   | index_magic_header | index_block_len | index_entry_count |
+ *   |segment_metadata_length | segment metadata | index entries |
+ */
+InputStream toStream() throws IOException;
+
+/**
+ * Get the related OffloadIndexEntry that contains the given 
messageEntryId.
+ *
+ * @param messageEntryId
+ *  the entry id of message
+ * @return the offload index entry
+ */
+OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws 
IOException;
+
+/**
+ * Get the entry count that contained in this index Block.
+ */
+int getEntryCount();
+
+/**
+ * Get LedgerMetadata.
+ */
+LedgerMetadata getLedgerMetadata();
+
+}
+
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
new file mode 100644
index 00..8ec0395498
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import 
org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockBuilderImpl;
+
+/**
+ * Interface for builder of index block used for offload a ledger to long term 
storage.
+ */
+@Unstable
+@LimitedPrivate
+public interface OffloadIndexBlockBuilder {
+
+/**
+ * Build index block with the passed in ledger metadata.
+ *
+ * @param metadata the ledger metadata
+ */
+OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata);
+
+/**
+ * Add one payload block related information into index block.
+ * It contains the first entryId in payload block, the 

[GitHub] sijie commented on issue #1715: Broker should not start replicator for root partitioned-topic

2018-05-02 Thread GitBox
sijie commented on issue #1715: Broker should not start replicator for root 
partitioned-topic
URL: https://github.com/apache/incubator-pulsar/pull/1715#issuecomment-386048749
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1716: Fix: deadlock while closing non-shared consumer

2018-05-02 Thread GitBox
sijie commented on issue #1716: Fix: deadlock while closing non-shared consumer
URL: https://github.com/apache/incubator-pulsar/pull/1716#issuecomment-386048679
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Rest API for Ledger Offloading (#1639)

2018-05-02 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 5f678e0  Rest API for Ledger Offloading (#1639)
5f678e0 is described below

commit 5f678e099095b133138ea0e536e06e916d8dc09e
Author: Ivan Kelly 
AuthorDate: Wed May 2 18:59:58 2018 +0200

Rest API for Ledger Offloading (#1639)

* Rest API for Ledger Offloading

Implemented for both V1 and V2 topic name formats. API takes a message
ID, up to which the broker will try to offload messages. It returns
the message ID of the first message in the topic which has not been
offloaded.

This patch also adds basic support for setting the Offloader
implementation in the broker (needed for testing). Subsequent patches
will make this configurable through ServiceConfiguration.

* Split compaction endpoint into two

One for triggering and one for getting the current status.

* Add conflict to rest api doc

* Fixed build
---
 .../org/apache/pulsar/broker/PulsarService.java|   6 +
 .../broker/admin/impl/PersistentTopicsBase.java|  24 +++-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  37 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  35 -
 .../pulsar/broker/service/BrokerService.java   |   2 +
 .../broker/service/persistent/PersistentTopic.java |  57 +++-
 .../pulsar/broker/admin/AdminApiOffloadTest.java   | 148 +
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  14 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java|  10 +-
 .../client/admin/LongRunningProcessStatus.java |  18 +--
 .../pulsar/client/admin/OffloadProcessStatus.java  |  55 
 .../org/apache/pulsar/client/admin/Topics.java |  19 ++-
 .../pulsar/client/admin/internal/TopicsImpl.java   |  30 -
 .../pulsar/admin/cli/CmdPersistentTopics.java  |   7 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java |   7 +-
 15 files changed, 426 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c74ecd7..8d39fda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -46,7 +46,9 @@ import java.util.function.Supplier;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -635,6 +637,10 @@ public class PulsarService implements AutoCloseable {
 return managedLedgerClientFactory.getManagedLedgerFactory();
 }
 
+public LedgerOffloader getManagedLedgerOffloader() {
+return NullLedgerOffloader.INSTANCE;
+}
+
 public ZooKeeperCache getLocalZkCache() {
 return localZkCache;
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ff50ec1..a354008 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -75,6 +76,8 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -86,7 +89,6 @@ import 

[GitHub] sijie closed pull request #1639: Rest API for Ledger Offloading

2018-05-02 Thread GitBox
sijie closed pull request #1639: Rest API for Ledger Offloading
URL: https://github.com/apache/incubator-pulsar/pull/1639
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3d14b8b08f..6909a82a99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -46,7 +46,9 @@
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -635,6 +637,10 @@ public ManagedLedgerFactory getManagedLedgerFactory() {
 return managedLedgerClientFactory.getManagedLedgerFactory();
 }
 
+public LedgerOffloader getManagedLedgerOffloader() {
+return NullLedgerOffloader.INSTANCE;
+}
+
 public ZooKeeperCache getLocalZkCache() {
 return localZkCache;
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 21b14d075f..44f80cf166 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -54,6 +54,7 @@
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -74,6 +75,8 @@
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -85,7 +88,6 @@
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -1090,12 +1092,30 @@ protected void internalTriggerCompaction(boolean 
authoritative) {
 }
 }
 
-protected CompactionStatus internalCompactionStatus(boolean authoritative) 
{
+protected LongRunningProcessStatus internalCompactionStatus(boolean 
authoritative) {
 validateAdminOperationOnTopic(authoritative);
 PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
 return topic.compactionStatus();
 }
 
+protected void internalTriggerOffload(boolean authoritative, MessageIdImpl 
messageId) {
+validateAdminOperationOnTopic(authoritative);
+PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+try {
+topic.triggerOffload(messageId);
+} catch (AlreadyRunningException e) {
+throw new RestException(Status.CONFLICT, e.getMessage());
+} catch (Exception e) {
+throw new RestException(e);
+}
+}
+
+protected OffloadProcessStatus internalOffloadStatus(boolean 
authoritative) {
+validateAdminOperationOnTopic(authoritative);
+PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+return topic.offloadStatus();
+}
+
 public static CompletableFuture 
getPartitionedTopicMetadata(PulsarService pulsar,
 String clientAppId, AuthenticationDataSource authenticationData, 
TopicName topicName) {
 CompletableFuture metadataFuture 

[GitHub] srkukarni commented on a change in pull request #1708: Refactor functions to use Sink interface

2018-05-02 Thread GitBox
srkukarni commented on a change in pull request #1708: Refactor functions to 
use Sink interface
URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185565400
 
 

 ##
 File path: 
pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
 ##
 @@ -45,5 +45,5 @@
  * @param value output value
  * @return Completable future fo async publish request
  */
-CompletableFuture write(T value);
-}
\ No newline at end of file
+CompletableFuture write(T value) throws Exception;
 
 Review comment:
   +1 on this 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1639: Rest API for Ledger Offloading

2018-05-02 Thread GitBox
sijie commented on issue #1639: Rest API for Ledger Offloading
URL: https://github.com/apache/incubator-pulsar/pull/1639#issuecomment-386044513
 
 
   @ivankelly yeah. will review. I was holding it after Matteo cut the 2.0 
branch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jai1 commented on issue #1707: Fixed authentication flow via Pulsar Proxy

2018-05-02 Thread GitBox
jai1 commented on issue #1707: Fixed authentication flow via Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1707#issuecomment-386043230
 
 
   @merlimat - ok changed the milestone to 2.0.1, can you review this PR


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1705: Add release notes page in website

2018-05-02 Thread GitBox
merlimat commented on issue #1705: Add release notes page in website
URL: https://github.com/apache/incubator-pulsar/pull/1705#issuecomment-386038126
 
 
   retest this please 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1709: Bumped version to 2.1.0-incubating-SNAPSHOT

2018-05-02 Thread GitBox
merlimat commented on issue #1709: Bumped version to 2.1.0-incubating-SNAPSHOT
URL: https://github.com/apache/incubator-pulsar/pull/1709#issuecomment-386037127
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1707: Fixed authentication flow via Pulsar Proxy

2018-05-02 Thread GitBox
merlimat commented on issue #1707: Fixed authentication flow via Pulsar Proxy
URL: https://github.com/apache/incubator-pulsar/pull/1707#issuecomment-386035196
 
 
   @jai1 This doesn't seem to me a super-critical error to stop the current RC. 
I think we can bump to 2.0.1 where we'll get all fixes (and we should do soon).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Added missing license header in terraform.tfvars (#1706)

2018-05-02 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new b837af8  Added missing license header in terraform.tfvars (#1706)
b837af8 is described below

commit b837af8612165bb880d7d66f8291f8163a5507a5
Author: Matteo Merli 
AuthorDate: Wed May 2 09:05:50 2018 -0700

Added missing license header in terraform.tfvars (#1706)
---
 deployment/terraform-ansible/aws/terraform.tfvars | 19 +++
 pom.xml   |  4 
 tests/README.md   | 21 +
 3 files changed, 44 insertions(+)

diff --git a/deployment/terraform-ansible/aws/terraform.tfvars 
b/deployment/terraform-ansible/aws/terraform.tfvars
index 65f4ea5..7af884b 100644
--- a/deployment/terraform-ansible/aws/terraform.tfvars
+++ b/deployment/terraform-ansible/aws/terraform.tfvars
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 public_key_path = "~/.ssh/id_rsa.pub"
 region  = "us-west-2"
 availability_zone   = "us-west-2a"
diff --git a/pom.xml b/pom.xml
index 761c710..29e8dca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -904,6 +904,7 @@ flexible messaging model and an intuitive client 
API.
 SCRIPT_STYLE
 SCRIPT_STYLE
 SCRIPT_STYLE
+SCRIPT_STYLE
   
 
   
@@ -967,6 +968,9 @@ flexible messaging model and an intuitive client 
API.
 **/*.htpasswd
 src/test/resources/athenz.conf.test
 deployment/terraform-ansible/templates/myid
+
+
+**/requirements.txt
   
 
   
diff --git a/tests/README.md b/tests/README.md
index 2fb4e8f..ccd4a8f 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -1,3 +1,24 @@
+
+
 This directory contains integration tests for Pulsar.
 
 The integration tests use a framework called [Arquillian 
Cube](https://github.com/arquillian/arquillian-cube) to bring up a bunch of 
docker containers running Pulsar services. TestNG can then be used to test 
functionallity against these containers.

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.


[GitHub] merlimat closed pull request #1706: Added missing license header in terraform.tfvars

2018-05-02 Thread GitBox
merlimat closed pull request #1706: Added missing license header in 
terraform.tfvars
URL: https://github.com/apache/incubator-pulsar/pull/1706
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/deployment/terraform-ansible/aws/terraform.tfvars 
b/deployment/terraform-ansible/aws/terraform.tfvars
index 65f4ea528b..7af884b91a 100644
--- a/deployment/terraform-ansible/aws/terraform.tfvars
+++ b/deployment/terraform-ansible/aws/terraform.tfvars
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 public_key_path = "~/.ssh/id_rsa.pub"
 region  = "us-west-2"
 availability_zone   = "us-west-2a"
diff --git a/pom.xml b/pom.xml
index 761c710fd2..29e8dca769 100644
--- a/pom.xml
+++ b/pom.xml
@@ -904,6 +904,7 @@ flexible messaging model and an intuitive client 
API.
 SCRIPT_STYLE
 SCRIPT_STYLE
 SCRIPT_STYLE
+SCRIPT_STYLE
   
 
   
@@ -967,6 +968,9 @@ flexible messaging model and an intuitive client 
API.
 **/*.htpasswd
 src/test/resources/athenz.conf.test
 deployment/terraform-ansible/templates/myid
+
+
+**/requirements.txt
   
 
   
diff --git a/tests/README.md b/tests/README.md
index 2fb4e8f58d..ccd4a8fe73 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -1,3 +1,24 @@
+
+
 This directory contains integration tests for Pulsar.
 
 The integration tests use a framework called [Arquillian 
Cube](https://github.com/arquillian/arquillian-cube) to bring up a bunch of 
docker containers running Pulsar services. TestNG can then be used to test 
functionallity against these containers.


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
zhaijack commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185533001
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -90,68 +84,60 @@ private int readEntries() throws IOException {
 checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
 checkState(bytesReadOffset < dataBlockFullOffset);
 
-try {
-// once reach the end of entry buffer, start a new read.
-if (entriesByteBuf.isEmpty()) {
-readNextEntriesFromLedger();
-log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
-bytesReadOffset, blockBytesHave);
-}
-
-// always read from the first ByteBuf in the list, once read all 
of its content remove it.
-ByteBuf entryByteBuf = entriesByteBuf.get(0);
-int ret = entryByteBuf.readByte();
-bytesReadOffset ++;
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
+}
 
-if (entryByteBuf.readableBytes() == 0) {
-entryByteBuf.release();
-entriesByteBuf.remove(0);
+// always read from the first ByteBuf in the list, once read all of 
its content remove it.
+ByteBuf entryByteBuf = entriesByteBuf.get(0);
+int ret = entryByteBuf.readByte();
+bytesReadOffset ++;
+
+if (entryByteBuf.readableBytes() == 0) {
+entryByteBuf.release();
+entriesByteBuf.remove(0);
+blockEntryCount++;
+if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + 
entriesByteBuf.get(0).readableBytes() > blockSize) {
 
 Review comment:
   Thanks, will change it.  This change did not follow the former comments, one 
reason is we need to keep dataBlockFullOffset to calculate the accurate bytes 
read from ledger, another reason is only when reading a new entry, it need to 
do the  judgement `(bytesReadOffset + entryByteBuf.bytesAvailable()) > 
blockSize` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
zhaijack commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185509828
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -90,68 +84,60 @@ private int readEntries() throws IOException {
 checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
 checkState(bytesReadOffset < dataBlockFullOffset);
 
-try {
-// once reach the end of entry buffer, start a new read.
-if (entriesByteBuf.isEmpty()) {
-readNextEntriesFromLedger();
-log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
-bytesReadOffset, blockBytesHave);
-}
-
-// always read from the first ByteBuf in the list, once read all 
of its content remove it.
-ByteBuf entryByteBuf = entriesByteBuf.get(0);
-int ret = entryByteBuf.readByte();
-bytesReadOffset ++;
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
 
 Review comment:
   it will return empty list, and line 93 will get IndexOutOfBoundsException. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185498765
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -90,68 +84,60 @@ private int readEntries() throws IOException {
 checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
 checkState(bytesReadOffset < dataBlockFullOffset);
 
-try {
-// once reach the end of entry buffer, start a new read.
-if (entriesByteBuf.isEmpty()) {
-readNextEntriesFromLedger();
-log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
-bytesReadOffset, blockBytesHave);
-}
-
-// always read from the first ByteBuf in the list, once read all 
of its content remove it.
-ByteBuf entryByteBuf = entriesByteBuf.get(0);
-int ret = entryByteBuf.readByte();
-bytesReadOffset ++;
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
 
 Review comment:
   what happens if readNextEntriesFromLedger cannot read any more entries?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185498146
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -90,68 +84,60 @@ private int readEntries() throws IOException {
 checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
 checkState(bytesReadOffset < dataBlockFullOffset);
 
-try {
-// once reach the end of entry buffer, start a new read.
-if (entriesByteBuf.isEmpty()) {
-readNextEntriesFromLedger();
-log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
-bytesReadOffset, blockBytesHave);
-}
-
-// always read from the first ByteBuf in the list, once read all 
of its content remove it.
-ByteBuf entryByteBuf = entriesByteBuf.get(0);
-int ret = entryByteBuf.readByte();
-bytesReadOffset ++;
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
+}
 
-if (entryByteBuf.readableBytes() == 0) {
-entryByteBuf.release();
-entriesByteBuf.remove(0);
+// always read from the first ByteBuf in the list, once read all of 
its content remove it.
+ByteBuf entryByteBuf = entriesByteBuf.get(0);
+int ret = entryByteBuf.readByte();
+bytesReadOffset ++;
+
+if (entryByteBuf.readableBytes() == 0) {
+entryByteBuf.release();
+entriesByteBuf.remove(0);
+blockEntryCount++;
+if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + 
entriesByteBuf.get(0).readableBytes() > blockSize) {
+// not able to place a new Entry.
+entriesByteBuf.forEach(buf -> buf.release());
+entriesByteBuf.clear();
+dataBlockFullOffset = bytesReadOffset;
 }
-
-return ret;
-} catch (InterruptedException | ExecutionException e) {
-log.error("Exception when get CompletableFuture. ", 
e);
-throw new IOException(e);
 }
-}
 
-// read entries from ledger, and pre-handle the returned ledgerEntries.
-private void readNextEntriesFromLedger() throws InterruptedException, 
ExecutionException {
-checkState(bytesReadOffset == blockBytesHave);
+return ret;
+}
 
-long start = startEntryId + blockEntryCount;
-long end = Math.min(start + entriesNumberEachRead - 1, 
ledger.getLastAddConfirmed());
+private List readNextEntriesFromLedger(long start, long 
maxNumberEntries) throws IOException {
+long end = Math.min(start + maxNumberEntries - 1, 
ledger.getLastAddConfirmed());
 try (LedgerEntries ledgerEntriesOnce = ledger.readAsync(start, 
end).get()) {
 log.debug("read ledger entries. start: {}, end: {}", start, end);
 
+List entries = Lists.newLinkedList();
+
 Iterator iterator = ledgerEntriesOnce.iterator();
-long entryId = start;
 while (iterator.hasNext()) {
 LedgerEntry entry = iterator.next();
 int entryLength = (int) entry.getLength();
-entryId = entry.getEntryId();
-
-if (blockSize - blockBytesHave >= entryLength + 
entryHeaderSize) {
-// data block has space for this entry, keep this entry
-CompositeByteBuf entryBuf = 
PooledByteBufAllocator.DEFAULT.compositeBuffer();
-ByteBuf entryHeaderBuf = 
PooledByteBufAllocator.DEFAULT.buffer(entryHeaderSize, entryHeaderSize);
-
-entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
-entryBuf.addComponents(entryHeaderBuf, 
entry.getEntryBuffer().retain());
-entryBuf.writerIndex(entryHeaderSize + entryLength);
-
-entriesByteBuf.add(entryBuf);
-
-// set counters
-blockEntryCount++;
-payloadBytesHave += entryLength;
-blockBytesHave += entryLength + entryHeaderSize;
-} else {
-// data block has no space left for a whole message entry
-dataBlockFullOffset = blockBytesHave;
-break;
-}
+long entryId = entry.getEntryId();
+
+CompositeByteBuf entryBuf = 
PooledByteBufAllocator.DEFAULT.compositeBuffer();
+ByteBuf entryHeaderBuf = 
PooledByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
+
+

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185496454
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -90,68 +84,60 @@ private int readEntries() throws IOException {
 checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
 checkState(bytesReadOffset < dataBlockFullOffset);
 
-try {
-// once reach the end of entry buffer, start a new read.
-if (entriesByteBuf.isEmpty()) {
-readNextEntriesFromLedger();
-log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
-bytesReadOffset, blockBytesHave);
-}
-
-// always read from the first ByteBuf in the list, once read all 
of its content remove it.
-ByteBuf entryByteBuf = entriesByteBuf.get(0);
-int ret = entryByteBuf.readByte();
-bytesReadOffset ++;
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
+}
 
-if (entryByteBuf.readableBytes() == 0) {
-entryByteBuf.release();
-entriesByteBuf.remove(0);
+// always read from the first ByteBuf in the list, once read all of 
its content remove it.
+ByteBuf entryByteBuf = entriesByteBuf.get(0);
+int ret = entryByteBuf.readByte();
+bytesReadOffset ++;
+
+if (entryByteBuf.readableBytes() == 0) {
+entryByteBuf.release();
+entriesByteBuf.remove(0);
+blockEntryCount++;
+if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + 
entriesByteBuf.get(0).readableBytes() > blockSize) {
+// not able to place a new Entry.
+entriesByteBuf.forEach(buf -> buf.release());
+entriesByteBuf.clear();
+dataBlockFullOffset = bytesReadOffset;
 }
-
-return ret;
-} catch (InterruptedException | ExecutionException e) {
-log.error("Exception when get CompletableFuture. ", 
e);
-throw new IOException(e);
 }
-}
 
-// read entries from ledger, and pre-handle the returned ledgerEntries.
-private void readNextEntriesFromLedger() throws InterruptedException, 
ExecutionException {
-checkState(bytesReadOffset == blockBytesHave);
+return ret;
+}
 
-long start = startEntryId + blockEntryCount;
-long end = Math.min(start + entriesNumberEachRead - 1, 
ledger.getLastAddConfirmed());
+private List readNextEntriesFromLedger(long start, long 
maxNumberEntries) throws IOException {
+long end = Math.min(start + maxNumberEntries - 1, 
ledger.getLastAddConfirmed());
 try (LedgerEntries ledgerEntriesOnce = ledger.readAsync(start, 
end).get()) {
 log.debug("read ledger entries. start: {}, end: {}", start, end);
 
+List entries = Lists.newLinkedList();
+
 Iterator iterator = ledgerEntriesOnce.iterator();
-long entryId = start;
 while (iterator.hasNext()) {
 LedgerEntry entry = iterator.next();
 int entryLength = (int) entry.getLength();
-entryId = entry.getEntryId();
-
-if (blockSize - blockBytesHave >= entryLength + 
entryHeaderSize) {
-// data block has space for this entry, keep this entry
-CompositeByteBuf entryBuf = 
PooledByteBufAllocator.DEFAULT.compositeBuffer();
-ByteBuf entryHeaderBuf = 
PooledByteBufAllocator.DEFAULT.buffer(entryHeaderSize, entryHeaderSize);
-
-entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
-entryBuf.addComponents(entryHeaderBuf, 
entry.getEntryBuffer().retain());
-entryBuf.writerIndex(entryHeaderSize + entryLength);
-
-entriesByteBuf.add(entryBuf);
-
-// set counters
-blockEntryCount++;
-payloadBytesHave += entryLength;
-blockBytesHave += entryLength + entryHeaderSize;
-} else {
-// data block has no space left for a whole message entry
-dataBlockFullOffset = blockBytesHave;
-break;
-}
+long entryId = entry.getEntryId();
+
+CompositeByteBuf entryBuf = 
PooledByteBufAllocator.DEFAULT.compositeBuffer();
 
 Review comment:
   nit: specify that the compositeBuffer will have 2 buffers since, we know it 
will.


[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185495929
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -90,68 +84,60 @@ private int readEntries() throws IOException {
 checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
 checkState(bytesReadOffset < dataBlockFullOffset);
 
-try {
-// once reach the end of entry buffer, start a new read.
-if (entriesByteBuf.isEmpty()) {
-readNextEntriesFromLedger();
-log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
-bytesReadOffset, blockBytesHave);
-}
-
-// always read from the first ByteBuf in the list, once read all 
of its content remove it.
-ByteBuf entryByteBuf = entriesByteBuf.get(0);
-int ret = entryByteBuf.readByte();
-bytesReadOffset ++;
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+entriesByteBuf = readNextEntriesFromLedger(startEntryId + 
blockEntryCount, ENTRIES_PER_READ);
+}
 
-if (entryByteBuf.readableBytes() == 0) {
-entryByteBuf.release();
-entriesByteBuf.remove(0);
+// always read from the first ByteBuf in the list, once read all of 
its content remove it.
+ByteBuf entryByteBuf = entriesByteBuf.get(0);
+int ret = entryByteBuf.readByte();
+bytesReadOffset ++;
+
+if (entryByteBuf.readableBytes() == 0) {
+entryByteBuf.release();
+entriesByteBuf.remove(0);
+blockEntryCount++;
+if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + 
entriesByteBuf.get(0).readableBytes() > blockSize) {
 
 Review comment:
   What entriesByteBuf is empty, but the first entry in the next read block 
will put you over the blockSize?
   This check needs to move to before line 92 (that's why I had the if-else in 
previous comment).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
zhaijack commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185492114
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
 
 Review comment:
   thanks, will do it later.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
zhaijack commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185491684
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+// Number of payload Bytes read from ledger, and has been has been kept in 
this InputStream.
+private int payloadBytesHave;
+// Number of bytes that has been kept in this InputStream.
+private int blockBytesHave;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int entriesNumberEachRead = 100;
+// buf the entry size and entry id.
+private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry 
id */;
+// Keep a list of all entries ByteBuf, each element contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, 
int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockBytesHave = DataBlockHeaderImpl.getDataStartOffset();
+this.payloadBytesHave = 0;
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < dataBlockFullOffset);
+
+try {
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+readNextEntriesFromLedger();
+log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
+bytesReadOffset, blockBytesHave);
+}
+
+// always read from the first ByteBuf in the list, once read all 
of its content remove it.
+ByteBuf entryByteBuf = entriesByteBuf.get(0);
+int ret = entryByteBuf.readByte();
+bytesReadOffset ++;
+
+  

[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
zhaijack commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185464725
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+// Number of payload Bytes read from ledger, and has been has been kept in 
this InputStream.
+private int payloadBytesHave;
+// Number of bytes that has been kept in this InputStream.
+private int blockBytesHave;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int entriesNumberEachRead = 100;
+// buf the entry size and entry id.
+private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry 
id */;
+// Keep a list of all entries ByteBuf, each element contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
 
 Review comment:
   Thanks, it can be, would like to keep it , or change it to List, 
this is more clear for code reader. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185438219
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+// Number of payload Bytes read from ledger, and has been has been kept in 
this InputStream.
+private int payloadBytesHave;
+// Number of bytes that has been kept in this InputStream.
+private int blockBytesHave;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int entriesNumberEachRead = 100;
+// buf the entry size and entry id.
+private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry 
id */;
+// Keep a list of all entries ByteBuf, each element contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, 
int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockBytesHave = DataBlockHeaderImpl.getDataStartOffset();
+this.payloadBytesHave = 0;
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < dataBlockFullOffset);
+
+try {
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+readNextEntriesFromLedger();
+log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
+bytesReadOffset, blockBytesHave);
+}
+
+// always read from the first ByteBuf in the list, once read all 
of its content remove it.
+ByteBuf entryByteBuf = entriesByteBuf.get(0);
+int ret = entryByteBuf.readByte();
+bytesReadOffset ++;
+
+ 

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185441127
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD);
 
 Review comment:
   BLOCK_PADDING


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185456144
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+// Number of payload Bytes read from ledger, and has been has been kept in 
this InputStream.
+private int payloadBytesHave;
+// Number of bytes that has been kept in this InputStream.
+private int blockBytesHave;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int entriesNumberEachRead = 100;
+// buf the entry size and entry id.
+private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry 
id */;
+// Keep a list of all entries ByteBuf, each element contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, 
int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockBytesHave = DataBlockHeaderImpl.getDataStartOffset();
+this.payloadBytesHave = 0;
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < dataBlockFullOffset);
+
+try {
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+readNextEntriesFromLedger();
+log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
+bytesReadOffset, blockBytesHave);
+}
+
+// always read from the first ByteBuf in the list, once read all 
of its content remove it.
+ByteBuf entryByteBuf = entriesByteBuf.get(0);
+int ret = entryByteBuf.readByte();
+bytesReadOffset ++;
+
+ 

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185456500
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
 
 Review comment:
   Could we define an interface and impl for this, so it's clear what extra 
data BlockAwareSegmentInputStream exposes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185441005
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+// Number of payload Bytes read from ledger, and has been has been kept in 
this InputStream.
+private int payloadBytesHave;
+// Number of bytes that has been kept in this InputStream.
+private int blockBytesHave;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int entriesNumberEachRead = 100;
+// buf the entry size and entry id.
+private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry 
id */;
 
 Review comment:
   ENTRY_HEADER_SIZE


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185441047
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+// Number of payload Bytes read from ledger, and has been has been kept in 
this InputStream.
+private int payloadBytesHave;
+// Number of bytes that has been kept in this InputStream.
+private int blockBytesHave;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int entriesNumberEachRead = 100;
 
 Review comment:
   ENTRIES_PER_READ


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185453817
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+// Number of payload Bytes read from ledger, and has been has been kept in 
this InputStream.
+private int payloadBytesHave;
+// Number of bytes that has been kept in this InputStream.
+private int blockBytesHave;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int entriesNumberEachRead = 100;
+// buf the entry size and entry id.
+private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry 
id */;
+// Keep a list of all entries ByteBuf, each element contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, 
int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockBytesHave = DataBlockHeaderImpl.getDataStartOffset();
+this.payloadBytesHave = 0;
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < dataBlockFullOffset);
+
+try {
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+readNextEntriesFromLedger();
+log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
+bytesReadOffset, blockBytesHave);
+}
+
+// always read from the first ByteBuf in the list, once read all 
of its content remove it.
+ByteBuf entryByteBuf = entriesByteBuf.get(0);
+int ret = entryByteBuf.readByte();
+bytesReadOffset ++;
+
+ 

[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185435514
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+// Number of payload Bytes read from ledger, and has been has been kept in 
this InputStream.
+private int payloadBytesHave;
+// Number of bytes that has been kept in this InputStream.
+private int blockBytesHave;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int entriesNumberEachRead = 100;
+// buf the entry size and entry id.
+private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry 
id */;
+// Keep a list of all entries ByteBuf, each element contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
 
 Review comment:
   It can be a List. The user of the buffer doesn't need to know 
what's backing it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test

2018-05-02 Thread GitBox
ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185456251
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 ##
 @@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *  DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends InputStream {
+private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD);
+
+private final ReadHandle ledger;
+private final long startEntryId;
+private final int blockSize;
+
+// Number of Message entries that read from ledger and been readout from 
this InputStream.
+private int blockEntryCount;
+// Number of payload Bytes read from ledger, and has been has been kept in 
this InputStream.
+private int payloadBytesHave;
+// Number of bytes that has been kept in this InputStream.
+private int blockBytesHave;
+
+// tracking read status for both header and entries.
+// Bytes that already been read from this InputStream
+private int bytesReadOffset = 0;
+// Byte from this index is all padding byte
+private int dataBlockFullOffset;
+private final InputStream dataBlockHeaderStream;
+
+// how many entries want to read from ReadHandle each time.
+private static final int entriesNumberEachRead = 100;
+// buf the entry size and entry id.
+private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry 
id */;
+// Keep a list of all entries ByteBuf, each element contains 2 buf: entry 
header and entry content.
+private List entriesByteBuf = null;
+
+public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, 
int blockSize) {
+this.ledger = ledger;
+this.startEntryId = startEntryId;
+this.blockSize = blockSize;
+this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, 
startEntryId).toStream();
+this.blockBytesHave = DataBlockHeaderImpl.getDataStartOffset();
+this.payloadBytesHave = 0;
+this.blockEntryCount = 0;
+this.dataBlockFullOffset = blockSize;
+this.entriesByteBuf = Lists.newLinkedList();
+}
+
+// read ledger entries.
+private int readEntries() throws IOException {
+checkState(bytesReadOffset >= 
DataBlockHeaderImpl.getDataStartOffset());
+checkState(bytesReadOffset < dataBlockFullOffset);
+
+try {
+// once reach the end of entry buffer, start a new read.
+if (entriesByteBuf.isEmpty()) {
+readNextEntriesFromLedger();
+log.debug("After readNextEntriesFromLedger: bytesReadOffset: 
{}, blockBytesHave: {}",
+bytesReadOffset, blockBytesHave);
+}
+
+// always read from the first ByteBuf in the list, once read all 
of its content remove it.
+ByteBuf entryByteBuf = entriesByteBuf.get(0);
+int ret = entryByteBuf.readByte();
+bytesReadOffset ++;
+
+ 

[GitHub] ivankelly commented on issue #1669: PIP-17: provide DataBlockHeader and implementation

2018-05-02 Thread GitBox
ivankelly commented on issue #1669: PIP-17: provide DataBlockHeader and 
implementation
URL: https://github.com/apache/incubator-pulsar/pull/1669#issuecomment-385912117
 
 
   Weird. It looks like the test hangs on MessageIdTest for over 2 hours


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1669: PIP-17: provide DataBlockHeader and implementation

2018-05-02 Thread GitBox
ivankelly commented on issue #1669: PIP-17: provide DataBlockHeader and 
implementation
URL: https://github.com/apache/incubator-pulsar/pull/1669#issuecomment-385912117
 
 
   Weird. It looks like the test hangs on MessageIdTest for over 2 hours. 
Obviously nothing to do with this change though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1687: Don't offload empty ledgers

2018-05-02 Thread GitBox
ivankelly commented on issue #1687: Don't offload empty ledgers
URL: https://github.com/apache/incubator-pulsar/pull/1687#issuecomment-385909859
 
 
   retest this please // timeout on unit tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1639: Rest API for Ledger Offloading

2018-05-02 Thread GitBox
ivankelly commented on issue #1639: Rest API for Ledger Offloading
URL: https://github.com/apache/incubator-pulsar/pull/1639#issuecomment-385909641
 
 
   @sijie pinging. This is ready for review again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch branch-1.22 updated: Fix: deadlock while closing non-persistent topic

2018-05-02 Thread jai1
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-1.22 by this push:
 new 1a31f91  Fix: deadlock while closing non-persistent topic
1a31f91 is described below

commit 1a31f91660de11060e0286e83ea35ef745e596f7
Author: Jai Asher 
AuthorDate: Wed May 2 01:28:40 2018 -0700

Fix: deadlock while closing non-persistent topic
---
 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java   | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 44a9e14..5d6b4ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -462,8 +462,12 @@ public class NonPersistentTopic implements Topic {
 
 FutureUtil.waitForAll(futures).thenRun(() -> {
 log.info("[{}] Topic closed", topic);
-brokerService.pulsar().getExecutor().submit(() -> 
brokerService.removeTopicFromCache(topic));
-closeFuture.complete(null);
+// unload topic iterates over topics map and removing from the map 
with the same thread creates deadlock.
+// so, execute it in different thread
+brokerService.executor().execute(() -> {
+brokerService.removeTopicFromCache(topic);
+closeFuture.complete(null);
+});
 }).exceptionally(exception -> {
 log.error("[{}] Error closing topic", topic, exception);
 isFenced = false;

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.


[incubator-pulsar] branch branch-1.22 updated: Relocate service files for shading `pulsar-client-admin` module

2018-05-02 Thread jai1
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-1.22 by this push:
 new 4c946b6  Relocate service files for shading `pulsar-client-admin` 
module
4c946b6 is described below

commit 4c946b6d70172347d07057f8cb5a2b33ca327d3c
Author: Jai Asher 
AuthorDate: Wed May 2 01:28:21 2018 -0700

Relocate service files for shading `pulsar-client-admin` module
---
 pom.xml| 2 +-
 pulsar-client-admin-shaded/pom.xml | 4 
 pulsar-client-shaded/pom.xml   | 4 
 3 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index b4e63d7..daa6c5a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -805,7 +805,7 @@ flexible messaging model and an intuitive client 
API.
 
   org.apache.maven.plugins
   maven-shade-plugin
-  2.4.2
+  3.1.0
 
 
   maven-enforcer-plugin
diff --git a/pulsar-client-admin-shaded/pom.xml 
b/pulsar-client-admin-shaded/pom.xml
index c5624fe..f72243f 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -185,6 +185,10 @@
   
org.apache.pulsar.admin.shade.org.reactivestreams
 
   
+  
+
+
+  
 
   
 
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index 81f873e..9219c1f 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -160,6 +160,10 @@
   
org.apache.pulsar.shade.org.apache.http
 
   
+  
+
+
+  
 
   
 

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.


[GitHub] jai1 closed pull request #1712: Fix: NPE when cursor failed to close empty subscription

2018-05-02 Thread GitBox
jai1 closed pull request #1712: Fix: NPE when cursor failed to close empty 
subscription
URL: https://github.com/apache/incubator-pulsar/pull/1712
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index e742f148cb..5e6153ba40 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -533,7 +533,9 @@ public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
 disconnectFuture.complete(null);
 }).exceptionally(exception -> {
 IS_FENCED_UPDATER.set(this, FALSE);
-dispatcher.reset();
+if (dispatcher != null) {
+dispatcher.reset();
+}
 log.error("[{}][{}] Error disconnecting consumers from 
subscription", topicName, subName,
 exception);
 disconnectFuture.completeExceptionally(exception);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jai1 closed pull request #1713: Fix: deadlock while closing non-persistent topic

2018-05-02 Thread GitBox
jai1 closed pull request #1713: Fix: deadlock while closing non-persistent topic
URL: https://github.com/apache/incubator-pulsar/pull/1713
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 44a9e14434..5d6b4baf66 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -462,8 +462,12 @@ void removeSubscription(String subscriptionName) {
 
 FutureUtil.waitForAll(futures).thenRun(() -> {
 log.info("[{}] Topic closed", topic);
-brokerService.pulsar().getExecutor().submit(() -> 
brokerService.removeTopicFromCache(topic));
-closeFuture.complete(null);
+// unload topic iterates over topics map and removing from the map 
with the same thread creates deadlock.
+// so, execute it in different thread
+brokerService.executor().execute(() -> {
+brokerService.removeTopicFromCache(topic);
+closeFuture.complete(null);
+});
 }).exceptionally(exception -> {
 log.error("[{}] Error closing topic", topic, exception);
 isFenced = false;


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jai1 closed pull request #1714: Relocate service files for shading `pulsar-client-admin` module (#1370)

2018-05-02 Thread GitBox
jai1 closed pull request #1714: Relocate service files for shading 
`pulsar-client-admin` module (#1370)
URL: https://github.com/apache/incubator-pulsar/pull/1714
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index b4e63d7211..daa6c5a704 100644
--- a/pom.xml
+++ b/pom.xml
@@ -805,7 +805,7 @@ flexible messaging model and an intuitive client 
API.
 
   org.apache.maven.plugins
   maven-shade-plugin
-  2.4.2
+  3.1.0
 
 
   maven-enforcer-plugin
diff --git a/pulsar-client-admin-shaded/pom.xml 
b/pulsar-client-admin-shaded/pom.xml
index c5624fe022..f72243f2a8 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -185,6 +185,10 @@
   
org.apache.pulsar.admin.shade.org.reactivestreams
 
   
+  
+
+
+  
 
   
 
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index 81f873e742..9219c1f718 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -160,6 +160,10 @@
   
org.apache.pulsar.shade.org.apache.http
 
   
+  
+
+
+  
 
   
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch branch-1.22 updated: Fix: NPE when cursor failed to close empty subscription

2018-05-02 Thread jai1
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-1.22 by this push:
 new 5ce64ff  Fix: NPE when cursor failed to close empty subscription
5ce64ff is described below

commit 5ce64ff0f2196ad791b2e19c64772c11a63e0030
Author: Jai Asher 
AuthorDate: Wed May 2 01:27:53 2018 -0700

Fix: NPE when cursor failed to close empty subscription
---
 .../pulsar/broker/service/persistent/PersistentSubscription.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index e742f14..5e6153b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -533,7 +533,9 @@ public class PersistentSubscription implements Subscription 
{
 disconnectFuture.complete(null);
 }).exceptionally(exception -> {
 IS_FENCED_UPDATER.set(this, FALSE);
-dispatcher.reset();
+if (dispatcher != null) {
+dispatcher.reset();
+}
 log.error("[{}][{}] Error disconnecting consumers from 
subscription", topicName, subName,
 exception);
 disconnectFuture.completeExceptionally(exception);

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.


[incubator-pulsar] branch branch-1.22 updated: Issue #1117: handle race in concurrent bundle split

2018-05-02 Thread jai1
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-1.22 by this push:
 new dd5c425  Issue #1117: handle race in concurrent bundle split
dd5c425 is described below

commit dd5c425f9b9df0de4852a49a9d8f731020271320
Author: Jai Asher 
AuthorDate: Wed May 2 01:27:05 2018 -0700

Issue #1117: handle race in concurrent bundle split
---
 .../broker/cache/LocalZooKeeperCacheService.java   |  21 ++-
 .../pulsar/broker/namespace/NamespaceService.java  | 171 ++---
 .../pulsar/broker/service/BrokerService.java   |  13 +-
 .../common/naming/NamespaceBundleFactory.java  |  32 ++--
 .../pulsar/common/naming/NamespaceBundles.java |  18 ++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 115 ++
 .../broker/namespace/NamespaceServiceTest.java |  10 +-
 .../pulsar/zookeeper/ZooKeeperDataCache.java   |   8 +
 8 files changed, 306 insertions(+), 82 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 66a1ffa..4b28cad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -22,6 +22,8 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
 
+import com.google.common.collect.Maps;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -38,6 +40,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,20 +82,28 @@ public class LocalZooKeeperCacheService {
 
 @Override
 public CompletableFuture getAsync(String 
path) {
-CompletableFuture future = new 
CompletableFuture<>();
+return getWithStatAsync(path).thenApply(entry -> entry.map(e 
-> e.getKey()));
+}
+
+@Override
+public CompletableFuture>> 
getWithStatAsync(String path) {
+CompletableFuture>> future 
= new CompletableFuture<>();
 
 // First check in local-zk cache
-super.getAsync(path).thenAccept(localPolicies -> {
+super.getWithStatAsync(path).thenAccept(result -> {
+Optional localPolicies = 
result.map(Entry::getKey);
 if (localPolicies.isPresent()) {
-future.complete(localPolicies);
+future.complete(result);
 } else {
 // create new policies node under Local ZK by coping 
it from Global ZK
 createPolicies(path, true).thenAccept(p -> {
 LOG.info("Successfully created local policies for 
{} -- {}", path, p);
 // local-policies have been created but it's not 
part of policiesCache. so, call
 // super.getAsync() which will load it and set the 
watch on local-policies path
-super.getAsync(path);
-future.complete(p);
+super.getWithStatAsync(path);
+Stat stat = new Stat();
+stat.setVersion(-1);
+
future.complete(Optional.of(Maps.immutableEntry(p.orElse(null), stat)));
 }).exceptionally(ex -> {
 future.completeExceptionally(ex);
 return null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e7b885a..b8e4ae0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -22,7 +22,6 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static 

[GitHub] jai1 closed pull request #1711: Fixed Lookup redirect logic on Proxy side

2018-05-02 Thread GitBox
jai1 closed pull request #1711: Fixed Lookup redirect logic on Proxy side
URL: https://github.com/apache/incubator-pulsar/pull/1711
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 7d4d683ba1..148a7a4893 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -125,7 +125,7 @@ private void performLookup(long clientRequestId, String 
topic, String brokerServ
 requestId).thenAccept(result -> {
 if (result.redirect) {
 // Need to try the lookup again on a different 
broker
-performLookup(clientRequestId, topic, 
result.brokerUrl, authoritative, numberOfRetries - 1);
+performLookup(clientRequestId, topic, 
result.brokerUrl, result.authoritative, numberOfRetries - 1);
 } else {
 // We have the result immediately
 String brokerUrl = connectWithTLS ? 
result.brokerUrlTls : result.brokerUrl;


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >