[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185673143 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java ## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.s3offload; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; +import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; + +import com.google.common.io.ByteStreams; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.DataBlockHeader; +import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; +import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +@Slf4j +public class BlockAwareSegmentInputStreamTest { +@Data +class MockLedgerEntry implements LedgerEntry { +public byte blockPadding = 0xB; +long ledgerId; +long entryId; +long length; +byte entryBytes[]; +ByteBuf entryBuffer; + +MockLedgerEntry(long ledgerId, long entryId, long length) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.length = length; +this.entryBytes = new byte[(int)length]; +entryBuffer = Unpooled.wrappedBuffer(entryBytes); +entryBuffer.writerIndex(0); +IntStream.range(0, (int)length).forEach(i -> entryBuffer.writeByte(blockPadding)); +} + +@Override +public ByteBuffer getEntryNioBuffer() { +return null; +} + +@Override +public LedgerEntry duplicate() { +return null; +} + +@Override +public void close() { +entryBuffer.release(); +} +} + +@Data +class MockLedgerEntries implements LedgerEntries { +int ledgerId; +int startEntryId; +int count; +int entrySize; +List entries; + +MockLedgerEntries(int ledgerId, int startEntryId, int count, int entrySize) { +this.ledgerId = ledgerId; +this.startEntryId = startEntryId; +this.count = count; +this.entrySize = entrySize; +this.entries = Lists.newArrayList(count); + +IntStream.range(startEntryId, startEntryId + count).forEach(i -> +entries.add(new MockLedgerEntry(ledgerId, i, entrySize))); +} + +@Override +public void close() { +entries.clear(); +} + +@Override +public LedgerEntry getEntry(long entryId) { +if (entryId < startEntryId || entryId >= startEntryId + count) { +return null; +} + +return entries.get(((int)entryId - startEntryId)); +} + +@Override +public Iterator iterator() { +return entries.iterator(); +} +} + +class MockReadHandle implements ReadHandle { +int ledgerId; +int entrySize; +int lac; +MockReadHandle(int ledgerId, int entrySize, int lac) { +this.ledgerId = ledgerId; +this.entrySize = entrySize; +this.lac = lac; +
[GitHub] lucperkins commented on issue #1457: Schema registry documentation
lucperkins commented on issue #1457: Schema registry documentation URL: https://github.com/apache/incubator-pulsar/pull/1457#issuecomment-386153232 @mgodave This is ready for review This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1709: Bumped version to 2.1.0-incubating-SNAPSHOT
merlimat closed pull request #1709: Bumped version to 2.1.0-incubating-SNAPSHOT URL: https://github.com/apache/incubator-pulsar/pull/1709 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/all/pom.xml b/all/pom.xml index 5251faec38..7ac9f54c2a 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT .. diff --git a/buildtools/pom.xml b/buildtools/pom.xml index d83bb2c904..100a9c7a98 100644 --- a/buildtools/pom.xml +++ b/buildtools/pom.xml @@ -30,7 +30,7 @@ org.apache.pulsar buildtools - 2.0.0-incubating-SNAPSHOT + 2.1.0-incubating-SNAPSHOT jar Pulsar Build Tools diff --git a/docker/grafana/pom.xml b/docker/grafana/pom.xml index 450eb827d5..0eaf5c8422 100644 --- a/docker/grafana/pom.xml +++ b/docker/grafana/pom.xml @@ -23,7 +23,7 @@ org.apache.pulsar docker-images -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT 4.0.0 org.apache.pulsar diff --git a/docker/pom.xml b/docker/pom.xml index ec4f8ca572..70f4480661 100644 --- a/docker/pom.xml +++ b/docker/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT org.apache.pulsar docker-images diff --git a/docker/pulsar/pom.xml b/docker/pulsar/pom.xml index 63c42ca3d8..3cd9fce76b 100644 --- a/docker/pulsar/pom.xml +++ b/docker/pulsar/pom.xml @@ -23,7 +23,7 @@ org.apache.pulsar docker-images -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT 4.0.0 org.apache.pulsar diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index 3ee7703994..870ed6a057 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT .. diff --git a/pom.xml b/pom.xml index 761c710fd2..c18a320682 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ org.apache.pulsar pulsar - 2.0.0-incubating-SNAPSHOT + 2.1.0-incubating-SNAPSHOT Pulsar Pulsar is a distributed pub-sub messaging platform with a very diff --git a/pulsar-broker-auth-athenz/pom.xml b/pulsar-broker-auth-athenz/pom.xml index 48e78e87ac..c163c10f4a 100644 --- a/pulsar-broker-auth-athenz/pom.xml +++ b/pulsar-broker-auth-athenz/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT pulsar-broker-auth-athenz diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index 75edadbe82..bb5a639840 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT pulsar-broker-common diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml index 5e77a95d5d..181d822084 100644 --- a/pulsar-broker-shaded/pom.xml +++ b/pulsar-broker-shaded/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT .. diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 2e1f22080a..0c9fe462f6 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT .. diff --git a/pulsar-client-admin-shaded-for-functions/pom.xml b/pulsar-client-admin-shaded-for-functions/pom.xml index 1e3833387f..444c4bfc52 100644 --- a/pulsar-client-admin-shaded-for-functions/pom.xml +++ b/pulsar-client-admin-shaded-for-functions/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT .. diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index df5f867fa6..ba8da01196 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT .. diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml index e0665b1179..702a29e246 100644 --- a/pulsar-client-admin/pom.xml +++ b/pulsar-client-admin/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -2.0.0-incubating-SNAPSHOT +2.1.0-incubating-SNAPSHOT .. diff --git a/pulsar-client-auth-athenz/pom.xml b/pulsar-client-auth-athenz/pom.xml index e7df0fc482..5acce5f831 100644 --- a/pulsar-client-auth-athenz/pom.xml +++ b/pulsar-client-auth-athenz/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -
[incubator-pulsar] branch master updated: Bumped version to 2.1.0-incubating-SNAPSHOT (#1709)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new efc893b Bumped version to 2.1.0-incubating-SNAPSHOT (#1709) efc893b is described below commit efc893b6997f6181e9cd785c9fd023d7632736ab Author: Matteo MerliAuthorDate: Wed May 2 16:35:54 2018 -0700 Bumped version to 2.1.0-incubating-SNAPSHOT (#1709) --- all/pom.xml | 2 +- buildtools/pom.xml| 2 +- docker/grafana/pom.xml| 2 +- docker/pom.xml| 2 +- docker/pulsar/pom.xml | 2 +- managed-ledger/pom.xml| 2 +- pom.xml | 2 +- pulsar-broker-auth-athenz/pom.xml | 2 +- pulsar-broker-common/pom.xml | 2 +- pulsar-broker-shaded/pom.xml | 2 +- pulsar-broker/pom.xml | 2 +- pulsar-client-admin-shaded-for-functions/pom.xml | 2 +- pulsar-client-admin-shaded/pom.xml| 2 +- pulsar-client-admin/pom.xml | 2 +- pulsar-client-auth-athenz/pom.xml | 2 +- pulsar-client-kafka-compat/pom.xml| 2 +- pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml | 2 +- pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml | 2 +- pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml| 2 +- pulsar-client-shaded/pom.xml | 2 +- pulsar-client-tools-test/pom.xml | 2 +- pulsar-client-tools/pom.xml | 2 +- pulsar-client/pom.xml | 2 +- pulsar-common/pom.xml | 2 +- pulsar-connect/aerospike/pom.xml | 2 +- pulsar-connect/cassandra/pom.xml | 2 +- pulsar-connect/core/pom.xml | 2 +- pulsar-connect/kafka/pom.xml | 2 +- pulsar-connect/pom.xml| 2 +- pulsar-connect/rabbitmq/pom.xml | 2 +- pulsar-connect/twitter/pom.xml| 2 +- pulsar-discovery-service/pom.xml | 2 +- pulsar-functions/api-java/pom.xml | 2 +- pulsar-functions/instance/pom.xml | 2 +- pulsar-functions/java-examples/pom.xml| 2 +- pulsar-functions/metrics/pom.xml | 2 +- pulsar-functions/pom.xml | 2 +- pulsar-functions/proto-shaded/pom.xml | 2 +- pulsar-functions/proto/pom.xml| 2 +- pulsar-functions/runtime-all/pom.xml | 2 +- pulsar-functions/runtime-shaded/pom.xml | 2 +- pulsar-functions/runtime/pom.xml | 2 +- pulsar-functions/utils/pom.xml| 2 +- pulsar-functions/worker-shaded/pom.xml| 2 +- pulsar-functions/worker/pom.xml | 2 +- pulsar-log4j2-appender/pom.xml| 2 +- pulsar-proxy/pom.xml | 2 +- pulsar-spark/pom.xml | 2 +- pulsar-storm/pom.xml | 2 +- pulsar-testclient/pom.xml | 2 +- pulsar-websocket/pom.xml | 2 +- pulsar-zookeeper-utils/pom.xml| 2 +- pulsar-zookeeper/pom.xml | 2 +- tests/docker-images/latest-version-image/pom.xml | 2 +- tests/docker-images/pom.xml | 2 +- tests/integration-tests-base/pom.xml | 2 +- tests/integration-tests-topologies/pom.xml| 2 +- tests/integration-tests-utils/pom.xml | 2 +- tests/integration/cli/pom.xml | 2 +- tests/integration/compaction/pom.xml | 2 +- tests/integration/pom.xml | 2 +- tests/integration/smoke/pom.xml | 2 +- tests/pom.xml | 2 +- 63 files changed, 63 insertions(+), 63
[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy
rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185667124 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java ## @@ -259,13 +295,34 @@ private boolean verifyAuthenticationIfNeeded(CommandConnect connect) { clientAuthRole = service.getAuthenticationService().authenticate(authenticationData, authMethod); LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, clientAuthRole); +if (service.getConfiguration().forwardAuthorizationCredentials()) { +this.clientAuthData = authData; +this.clientAuthMethod = authMethod; +} +createClient(clientConf, this.clientAuthData, this.clientAuthMethod); Review comment: as name suggests. can we create and return client in `createClient()`: `this.client = createClient(..)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy
rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185666025 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java ## @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.api.Commands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.EventLoopGroup; + +public class ProxyClientCnx extends ClientCnx { + + String clientAuthRole; + String clientAuthData; + String clientAuthMethod; + + public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole, + String clientAuthData, String clientAuthMethod) { + super(conf, eventLoopGroup); + this.clientAuthRole = clientAuthRole; + this.clientAuthData = clientAuthData; + this.clientAuthMethod = clientAuthMethod; + } + + @Override + protected ByteBuf newConnectCommand() throws PulsarClientException { + if (log.isDebugEnabled()) { +log.info( +"New Connection opened via ProxyClientCnx with params clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}", +clientAuthRole, clientAuthData, clientAuthMethod); + } + String authData = ""; Review comment: can we initialize with null? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy
rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185668132 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java ## @@ -65,15 +60,10 @@ private ZooKeeperClientFactory zkClientFactory = null; private final EventLoopGroup acceptorGroup; -private final EventLoopGroup workerGroup; +final EventLoopGroup workerGroup; Review comment: can we keep it private? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy
rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185666469 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java ## @@ -229,14 +245,39 @@ protected void handleLookup(CommandLookupTopic lookup) { private void close() { state = State.Closed; ctx.close(); +try { +client.close(); +} catch (PulsarClientException e) { +LOG.error("Unable to clode pulsar client - {}", client); Review comment: type `close ` and add `e.getMessage()` in log This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy
rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185667623 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java ## @@ -259,13 +295,34 @@ private boolean verifyAuthenticationIfNeeded(CommandConnect connect) { clientAuthRole = service.getAuthenticationService().authenticate(authenticationData, authMethod); LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, clientAuthRole); +if (service.getConfiguration().forwardAuthorizationCredentials()) { +this.clientAuthData = authData; +this.clientAuthMethod = authMethod; +} +createClient(clientConf, this.clientAuthData, this.clientAuthMethod); Review comment: also we don't want to share proxy-to-broker connection when `service.getConfiguration().forwardAuthorizationCredentials()` is enabled. else, we can always share proxy-to-broker connection for lookup. so, should we create a new client only in case of `service.getConfiguration().forwardAuthorizationCredentials()=true` else we can always share it.? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy
rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185668092 ## File path: pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java ## @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.Set; + +import javax.naming.AuthenticationException; + +import org.apache.bookkeeper.test.PortManager; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Sets; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +public class ProxyAuthenticationTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticationTest.class); + + public static class BasicAuthenticationData implements AuthenticationDataProvider { + private String authParam; + + public BasicAuthenticationData(String authParam) { + this.authParam = authParam; + } + + public boolean hasDataFromCommand() { + return true; + } + + public String getCommandData() { + return authParam; + } + + public boolean hasDataForHttp() { + return true; + } + + @Override + public Set> getHttpHeaders() { + Map headers = new HashMap<>(); + headers.put("BasicAuthentication", authParam); + return headers.entrySet(); + } + } + + public static class BasicAuthentication implements Authentication { + + private String authParam; + + @Override + public void close() throws IOException { + // noop + } + + @Override + public String getAuthMethodName() { + return "BasicAuthentication"; + } + + @Override + public AuthenticationDataProvider getAuthData() throws PulsarClientException { + try { + return new BasicAuthenticationData(authParam); + } catch (Exception e) { + throw new PulsarClientException(e); + } + } + + @Override + public void configure(Map authParams) { + this.authParam = String.format("{\"entityType\": \"%s\", \"expiryTime\": \"%s\"}", +
[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy
rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185668018 ## File path: pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java ## @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.Set; + +import javax.naming.AuthenticationException; + +import org.apache.bookkeeper.test.PortManager; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Sets; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +public class ProxyAuthenticationTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticationTest.class); + + public static class BasicAuthenticationData implements AuthenticationDataProvider { + private String authParam; + + public BasicAuthenticationData(String authParam) { + this.authParam = authParam; + } + + public boolean hasDataFromCommand() { + return true; + } + + public String getCommandData() { + return authParam; + } + + public boolean hasDataForHttp() { + return true; + } + + @Override + public Set> getHttpHeaders() { + Map headers = new HashMap<>(); + headers.put("BasicAuthentication", authParam); + return headers.entrySet(); + } + } + + public static class BasicAuthentication implements Authentication { + + private String authParam; + + @Override + public void close() throws IOException { + // noop + } + + @Override + public String getAuthMethodName() { + return "BasicAuthentication"; + } + + @Override + public AuthenticationDataProvider getAuthData() throws PulsarClientException { + try { + return new BasicAuthenticationData(authParam); + } catch (Exception e) { + throw new PulsarClientException(e); + } + } + + @Override + public void configure(Map authParams) { + this.authParam = String.format("{\"entityType\": \"%s\", \"expiryTime\": \"%s\"}", +
[GitHub] rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy
rdhabalia commented on a change in pull request #1707: Fixed authentication flow via Pulsar Proxy URL: https://github.com/apache/incubator-pulsar/pull/1707#discussion_r185665960 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java ## @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.api.Commands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.EventLoopGroup; + +public class ProxyClientCnx extends ClientCnx { + + String clientAuthRole; + String clientAuthData; + String clientAuthMethod; + + public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole, + String clientAuthData, String clientAuthMethod) { + super(conf, eventLoopGroup); + this.clientAuthRole = clientAuthRole; + this.clientAuthData = clientAuthData; + this.clientAuthMethod = clientAuthMethod; + } + + @Override + protected ByteBuf newConnectCommand() throws PulsarClientException { + if (log.isDebugEnabled()) { +log.info( Review comment: `log.debug(..)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185665085 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java ## @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The BlockAwareSegmentInputStreamImpl for each cold storage data block. + * It gets data from ledger, and will be read out the content for a data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + padding + */ +public class BlockAwareSegmentInputStreamImpl extends InputStream implements BlockAwareSegmentInputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class); + +private static final byte[] BLOCK_END_PADDING = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int ENTRIES_PER_READ = 100; +// buf the entry size and entry id. +private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < blockSize); + +// once reach the end of entry buffer, start a new read. +if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) { Review comment: OK. current code is try to release buffers earlier, and make each call of `readEntries` do less thing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1709: Bumped version to 2.1.0-incubating-SNAPSHOT
sijie commented on issue #1709: Bumped version to 2.1.0-incubating-SNAPSHOT URL: https://github.com/apache/incubator-pulsar/pull/1709#issuecomment-386132512 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1719: Used shaded version of protobuf-2.4.1 internally, to coexist with components using proto3
sijie commented on a change in pull request #1719: Used shaded version of protobuf-2.4.1 internally, to coexist with components using proto3 URL: https://github.com/apache/incubator-pulsar/pull/1719#discussion_r185649007 ## File path: protobuf-shaded/pom.xml ## @@ -0,0 +1,76 @@ + + +
[incubator-pulsar] branch master updated: Version links in release notes (#1721)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 630e119 Version links in release notes (#1721) 630e119 is described below commit 630e1190255a14b483a651a0d248a5a0f6ac40aa Author: Luc PerkinsAuthorDate: Wed May 2 14:44:26 2018 -0700 Version links in release notes (#1721) This enables people to easily link to release notes for specific versions. In the future, this page should be templated and built from source files rather than raw Markdown, but for now this provides an easy stopgap solution. --- site/release-notes.md | 52 +-- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/site/release-notes.md b/site/release-notes.md index 629851a..4e865b8 100644 --- a/site/release-notes.md +++ b/site/release-notes.md @@ -26,7 +26,7 @@ layout: content ## Apache incubator -### 1.22.0-incubating 2018-03-06 +### 1.22.0-incubating 2018-03-06 This is the fourth of Apache Pulsar since entering the ASF incubator. @@ -71,7 +71,7 @@ https://github.com/apache/incubator-pulsar/milestone/11?closed=1 https://github.com/apache/incubator-pulsar/releases/tag/v1.22.0-incubating -### 1.21.0-incubating 2017-12-17 +### 1.21.0-incubating 2017-12-17 This is the third of Apache Pulsar since entering the ASF incubator. @@ -93,7 +93,7 @@ https://github.com/apache/incubator-pulsar/milestone/10?closed=1 https://github.com/apache/incubator-pulsar/releases/tag/v1.21.0-incubating -### 1.20.0-incubating 2017-08-08 +### 1.20.0-incubating 2017-08-08 This is the second of Apache Pulsar since entering the ASF incubator. @@ -111,7 +111,7 @@ https://github.com/apache/incubator-pulsar/milestone/9?closed=1 https://github.com/apache/incubator-pulsar/releases/tag/v1.20.0-incubating -### 1.19.0-incubating 2017-08-08 +### 1.19.0-incubating 2017-08-08 This is the first of Apache Pulsar since entering the ASF incubator. @@ -131,7 +131,7 @@ https://github.com/apache/incubator-pulsar/releases/tag/v1.19.0-incubating ## Pre-Apache -### 1.18 2017-06-17 +### 1.18 2017-06-17 Main changes: * [#325](https://github.com/apache/incubator-pulsar/pull/325) Add Modular load manager documentation @@ -172,7 +172,7 @@ Full list of changes: https://github.com/yahoo/pulsar/milestone/7?closed=1 https://github.com/apache/incubator-pulsar/releases/tag/v1.18 -### 1.17.5 2017-05-02 +### 1.17.5 2017-05-02 * [#343](https://github.com/apache/incubator-pulsar/pull/343) Fix ModularLoadManager to select broker from current available-broker list * [#384](https://github.com/apache/incubator-pulsar/pull/384) Fix Send replay entries read callback from background thread, to avoid recursive stack calls @@ -181,7 +181,7 @@ https://github.com/apache/incubator-pulsar/releases/tag/v1.18 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.5 -### 1.17.4 2017-04-25 +### 1.17.4 2017-04-25 * [#362](https://github.com/apache/incubator-pulsar/pull/362) Fix add timeout on blocking ZookeeperCache get call * [#375](https://github.com/apache/incubator-pulsar/pull/375) Fix possible deal lock on topic loading if broker fails to get MLConfiguration from zk @@ -189,13 +189,13 @@ https://github.com/apache/incubator-pulsar/releases/tag/v1.17.5 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.4 -### 1.17.3 2017-04-20 +### 1.17.3 2017-04-20 * [#367](https://github.com/apache/incubator-pulsar/pull/367) Fix dispatcher correctly finds available consumer from list of shared-subscription consumers https://github.com/apache/incubator-pulsar/releases/tag/v1.17.3 -### 1.17.2 2017-04-06 +### 1.17.2 2017-04-06 * [#327](https://github.com/apache/incubator-pulsar/pull/327) Create znode for dynamic configuration if not present * [#336](https://github.com/apache/incubator-pulsar/pull/336) Fix prevent creation of topic when bundle is disable @@ -203,13 +203,13 @@ https://github.com/apache/incubator-pulsar/releases/tag/v1.17.3 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.2 -### 1.17.1 2017-03-30 +### 1.17.1 2017-03-30 * [#326](https://github.com/apache/incubator-pulsar/pull/326) Fix memory leak while duplicating entry data from existing entry https://github.com/apache/incubator-pulsar/releases/tag/v1.17.1 -### 1.17 2017-03-30 +### 1.17 2017-03-30 Main changes: @@ -239,13 +239,13 @@ Full list of changes: https://github.com/apache/incubator-pulsar/milestone/3?clo https://github.com/apache/incubator-pulsar/releases/tag/v1.17 -### 1.16.5 2017-03-10 +### 1.16.5 2017-03-10 * [#311](https://github.com/apache/incubator-pulsar/pull/311) Exclude netty individual jars from binary distribution. This issue was causing binary
[GitHub] sijie closed pull request #1720: Fix broken image
sijie closed pull request #1720: Fix broken image URL: https://github.com/apache/incubator-pulsar/pull/1720 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md b/site/docs/latest/getting-started/ConceptsAndArchitecture.md index 817b3eb6f0..2efabeaf8c 100644 --- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md +++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md @@ -147,7 +147,7 @@ When the master consumer disconnects, all (non-acked and subsequent) messages wi In the diagram above, Consumer-C-1 is the master consumer while Consumer-C-2 would be the next in line to receive messages if Consumer-C-2 disconnected. -{% include figure.html src="/img/failover-subscriptions.png" alt="Shared subscriptions" width="80" %} +{% include figure.html src="/img/failover-subscriptions.png" alt="Failover subscriptions" width="80" %} ### Multi-topic subscriptions diff --git a/site/img/failover-subscriptions.png b/site/img/failover-subscriptions.png new file mode 100644 index 00..2cf83fc1c5 Binary files /dev/null and b/site/img/failover-subscriptions.png differ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Fix broken image (#1720)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new e9f8d9d Fix broken image (#1720) e9f8d9d is described below commit e9f8d9d7b37721a3c27ffc5264663dbccfafcd75 Author: Luc PerkinsAuthorDate: Wed May 2 14:44:54 2018 -0700 Fix broken image (#1720) * add missing image * update image * one more small change to image --- .../latest/getting-started/ConceptsAndArchitecture.md | 2 +- site/img/failover-subscriptions.png | Bin 0 -> 64604 bytes 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md b/site/docs/latest/getting-started/ConceptsAndArchitecture.md index 817b3eb..2efabea 100644 --- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md +++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md @@ -147,7 +147,7 @@ When the master consumer disconnects, all (non-acked and subsequent) messages wi In the diagram above, Consumer-C-1 is the master consumer while Consumer-C-2 would be the next in line to receive messages if Consumer-C-2 disconnected. -{% include figure.html src="/img/failover-subscriptions.png" alt="Shared subscriptions" width="80" %} +{% include figure.html src="/img/failover-subscriptions.png" alt="Failover subscriptions" width="80" %} ### Multi-topic subscriptions diff --git a/site/img/failover-subscriptions.png b/site/img/failover-subscriptions.png new file mode 100644 index 000..2cf83fc Binary files /dev/null and b/site/img/failover-subscriptions.png differ -- To stop receiving notification emails like this one, please contact si...@apache.org.
[GitHub] sijie closed pull request #1721: Version links in release notes
sijie closed pull request #1721: Version links in release notes URL: https://github.com/apache/incubator-pulsar/pull/1721 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site/release-notes.md b/site/release-notes.md index 629851a0b9..4e865b85cd 100644 --- a/site/release-notes.md +++ b/site/release-notes.md @@ -26,7 +26,7 @@ layout: content ## Apache incubator -### 1.22.0-incubating 2018-03-06 +### 1.22.0-incubating 2018-03-06 This is the fourth of Apache Pulsar since entering the ASF incubator. @@ -71,7 +71,7 @@ https://github.com/apache/incubator-pulsar/milestone/11?closed=1 https://github.com/apache/incubator-pulsar/releases/tag/v1.22.0-incubating -### 1.21.0-incubating 2017-12-17 +### 1.21.0-incubating 2017-12-17 This is the third of Apache Pulsar since entering the ASF incubator. @@ -93,7 +93,7 @@ https://github.com/apache/incubator-pulsar/milestone/10?closed=1 https://github.com/apache/incubator-pulsar/releases/tag/v1.21.0-incubating -### 1.20.0-incubating 2017-08-08 +### 1.20.0-incubating 2017-08-08 This is the second of Apache Pulsar since entering the ASF incubator. @@ -111,7 +111,7 @@ https://github.com/apache/incubator-pulsar/milestone/9?closed=1 https://github.com/apache/incubator-pulsar/releases/tag/v1.20.0-incubating -### 1.19.0-incubating 2017-08-08 +### 1.19.0-incubating 2017-08-08 This is the first of Apache Pulsar since entering the ASF incubator. @@ -131,7 +131,7 @@ https://github.com/apache/incubator-pulsar/releases/tag/v1.19.0-incubating ## Pre-Apache -### 1.18 2017-06-17 +### 1.18 2017-06-17 Main changes: * [#325](https://github.com/apache/incubator-pulsar/pull/325) Add Modular load manager documentation @@ -172,7 +172,7 @@ Full list of changes: https://github.com/yahoo/pulsar/milestone/7?closed=1 https://github.com/apache/incubator-pulsar/releases/tag/v1.18 -### 1.17.5 2017-05-02 +### 1.17.5 2017-05-02 * [#343](https://github.com/apache/incubator-pulsar/pull/343) Fix ModularLoadManager to select broker from current available-broker list * [#384](https://github.com/apache/incubator-pulsar/pull/384) Fix Send replay entries read callback from background thread, to avoid recursive stack calls @@ -181,7 +181,7 @@ https://github.com/apache/incubator-pulsar/releases/tag/v1.18 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.5 -### 1.17.4 2017-04-25 +### 1.17.4 2017-04-25 * [#362](https://github.com/apache/incubator-pulsar/pull/362) Fix add timeout on blocking ZookeeperCache get call * [#375](https://github.com/apache/incubator-pulsar/pull/375) Fix possible deal lock on topic loading if broker fails to get MLConfiguration from zk @@ -189,13 +189,13 @@ https://github.com/apache/incubator-pulsar/releases/tag/v1.17.5 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.4 -### 1.17.3 2017-04-20 +### 1.17.3 2017-04-20 * [#367](https://github.com/apache/incubator-pulsar/pull/367) Fix dispatcher correctly finds available consumer from list of shared-subscription consumers https://github.com/apache/incubator-pulsar/releases/tag/v1.17.3 -### 1.17.2 2017-04-06 +### 1.17.2 2017-04-06 * [#327](https://github.com/apache/incubator-pulsar/pull/327) Create znode for dynamic configuration if not present * [#336](https://github.com/apache/incubator-pulsar/pull/336) Fix prevent creation of topic when bundle is disable @@ -203,13 +203,13 @@ https://github.com/apache/incubator-pulsar/releases/tag/v1.17.3 https://github.com/apache/incubator-pulsar/releases/tag/v1.17.2 -### 1.17.1 2017-03-30 +### 1.17.1 2017-03-30 * [#326](https://github.com/apache/incubator-pulsar/pull/326) Fix memory leak while duplicating entry data from existing entry https://github.com/apache/incubator-pulsar/releases/tag/v1.17.1 -### 1.17 2017-03-30 +### 1.17 2017-03-30 Main changes: @@ -239,13 +239,13 @@ Full list of changes: https://github.com/apache/incubator-pulsar/milestone/3?clo https://github.com/apache/incubator-pulsar/releases/tag/v1.17 -### 1.16.5 2017-03-10 +### 1.16.5 2017-03-10 * [#311](https://github.com/apache/incubator-pulsar/pull/311) Exclude netty individual jars from binary distribution. This issue was causing binary distribution to have conflicting netty dependencies. https://github.com/apache/incubator-pulsar/releases/tag/v1.16.5 -### 1.16.4 2017-03-10 +### 1.16.4 2017-03-10 * [#265](https://github.com/apache/incubator-pulsar/pull/265) Fix client closes http-connection on internal-server error * [#283](https://github.com/apache/incubator-pulsar/pull/283) Fix recycle keep alive command-object properly @@ -253,19 +253,19 @@
[incubator-pulsar] branch master updated: Refactor functions to use Sink interface (#1708)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new c3c8013 Refactor functions to use Sink interface (#1708) c3c8013 is described below commit c3c8013d701d986970ec9baef142c338233f2f89 Author: Boyang Jerry PengAuthorDate: Wed May 2 14:43:10 2018 -0700 Refactor functions to use Sink interface (#1708) * use pulsar sink * removing exception from interface * addressing various comments * changing MultiConsumersOneOuputTopicProducers to use String for partition id --- .../java/org/apache/pulsar/connect/core/Sink.java | 2 +- .../functions/instance/JavaInstanceRunnable.java | 212 -- .../instance/processors/AtLeastOnceProcessor.java | 77 --- .../instance/processors/AtMostOnceProcessor.java | 79 --- .../processors/EffectivelyOnceProcessor.java | 120 -- .../instance/processors/MessageProcessor.java | 107 - .../instance/processors/MessageProcessorBase.java | 155 - .../MultiConsumersOneOuputTopicProducers.java | 20 +- .../functions/instance/producers/Producers.java| 4 +- .../pulsar/functions/sink/DefaultRuntimeSink.java | 1 - .../apache/pulsar/functions/sink/PulsarSink.java | 247 - .../PulsarSinkConfig.java} | 19 +- .../apache/pulsar/functions/sink/RuntimeSink.java | 5 +- .../pulsar/functions/source/PulsarSource.java | 30 +-- .../{PulsarConfig.java => PulsarSourceConfig.java} | 9 +- .../src/main/python/python_instance_main.py| 16 +- .../instance/JavaInstanceRunnableTest.java | 81 --- .../MultiConsumersOneOutputTopicProducersTest.java | 2 +- .../functions/sink/DefaultRuntimeSinkTest.java | 6 +- .../PulsarSinkTest.java} | 136 +--- .../pulsar/functions/source/PulsarSourceTest.java | 90 +++- .../pulsar/functions/runtime/JavaInstanceMain.java | 40 ++-- .../pulsar/functions/runtime/ProcessRuntime.java | 35 ++- .../functions/runtime/ProcessRuntimeTest.java | 20 +- 24 files changed, 683 insertions(+), 830 deletions(-) diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java index ca569e7..cd2d63d 100644 --- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java @@ -46,4 +46,4 @@ public interface Sink extends AutoCloseable { * @return Completable future fo async publish request */ CompletableFuture write(T value); -} \ No newline at end of file +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index ca6414d..2e8037b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -22,15 +22,13 @@ package org.apache.pulsar.functions.instance; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; +import com.google.gson.Gson; import io.netty.buffer.ByteBuf; import java.util.Arrays; -import java.util.Base64; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import lombok.AccessLevel; import lombok.Getter; @@ -49,18 +47,23 @@ import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; -import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.connect.core.Record; +import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.api.utils.DefaultSerDe; -import org.apache.pulsar.functions.instance.processors.MessageProcessor; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.proto.Function.SourceSpec; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.sink.PulsarSink; +import
[GitHub] sijie closed pull request #1708: Refactor functions to use Sink interface
sijie closed pull request #1708: Refactor functions to use Sink interface URL: https://github.com/apache/incubator-pulsar/pull/1708 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java index ca569e794c..cd2d63d242 100644 --- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java @@ -46,4 +46,4 @@ * @return Completable future fo async publish request */ CompletableFuture write(T value); -} \ No newline at end of file +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index ca6414dc7a..2e8037b814 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -22,15 +22,13 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; +import com.google.gson.Gson; import io.netty.buffer.ByteBuf; import java.util.Arrays; -import java.util.Base64; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import lombok.AccessLevel; import lombok.Getter; @@ -49,18 +47,23 @@ import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; -import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.connect.core.Record; +import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.api.utils.DefaultSerDe; -import org.apache.pulsar.functions.instance.processors.MessageProcessor; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.proto.Function.SourceSpec; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.sink.PulsarSink; +import org.apache.pulsar.functions.sink.PulsarSinkConfig; +import org.apache.pulsar.functions.sink.RuntimeSink; import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.pulsar.functions.source.PulsarSource; +import org.apache.pulsar.functions.source.PulsarSourceConfig; +import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; -import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; @@ -93,18 +96,14 @@ @Getter private Exception deathException; -@Getter(AccessLevel.PACKAGE) -private SerDe outputSerDe; - -@Getter(AccessLevel.PACKAGE) -// processor -private final MessageProcessor processor; - // function stats private final FunctionStats stats; private Record currentRecord; +private Source source; +private RuntimeSink sink; + public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, String jarFile, @@ -116,9 +115,6 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig, this.client = (PulsarClientImpl) pulsarClient; this.stateStorageServiceUrl = stateStorageServiceUrl; this.stats = new FunctionStats(); -this.processor = MessageProcessor.create( -client, -instanceConfig.getFunctionDetails()); } /** @@ -151,19 +147,16 @@ JavaInstance setupJavaInstance() throws Exception { typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); } -// setup serde -setupSerDe(typeArgs, clsLoader); - // start the state table setupStateTable(); // start the output producer -processor.setupOutput(outputSerDe); +setupOutput(typeArgs[1]); // start the input consumer -
[incubator-pulsar] annotated tag v1.22.1-incubating-candidate-1 updated (741174e -> 3151f61)
This is an automated email from the ASF dual-hosted git repository. jai1 pushed a change to annotated tag v1.22.1-incubating-candidate-1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git. *** WARNING: tag v1.22.1-incubating-candidate-1 was modified! *** from 741174e (commit) to 3151f61 (tag) tagging 741174e6b0a46592a46fe2944ef292f08bffa347 (commit) replaces v1.22.0-incubating by jai1 on Wed May 2 13:30:43 2018 -0700 - Log - Release v1.22.1-incubating-candidate-1 -BEGIN PGP SIGNATURE- iQJEBAABCAAuFiEEDQCP4t9TLRC/fG0se6GmTLvBFOwFAlrqH/MQHGphaTFAYXBh Y2hlLm9yZwAKCRB7oaZMu8EU7HnnEACnKg275Ih7aXXK5u6L6qbMj5ELicjWxB8e EF4My0e7+H1KyRTez1PnMmCMel4w3twFUjSg4VMbyh9N9ts36Zz9kC4NfecP1xPI NXR9QG1DxKwdK3OTfFnZQO4H7KDV0F9O20jNiw2QiVX8gitEf4cQPT3RX1Djcbfm 2/TlkZheaBQay/lDUayU4VTjnTG8aJZ5ZSEwy+pC1n6sgZXq0YN60afj5eDXet2o tRuOSazGmxtBJKk6PVC7pCOgUEyYVxUEFcH8qn67P5Gdu4s4HUL3gxVrXSrR9Zd0 9ne49yReU4+XGhW+nIg0kenVvl5+Yezt+vD2yqvIFbQxgEZHKCjaVk9XjnCa42js /592iryYDiwDulefkWhybH2bg1E17szCQiN3gLKAn51Bc90nY/V5Hy752U2S7IgE W4zgDnA8LgMKR9Dr2i0fv+JlO59Ki2meBpDBjH09eYseidVc0a2Dh+xsoQvWIkc7 qn7QD9ElbMw6LO2R1n13g9Cq89kMIplr8T4N3WO4WxLa2ILgqZPAnABQPW42tIIZ JDrLEzfvmF1WXM43mQV4a/3SWPS7VaP/mUCFh2n7bwX+KKaWiYO21M9IWvymghql 8J9iA7wXF0Q0pP7mF6JR9Cswtsl+ZlrGC46qXel9cUAPXbeSv6PaV3sj5RQQtpuC c9s4uY2U/Q== =Q5kY -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes: -- To stop receiving notification emails like this one, please contact j...@apache.org.
[incubator-pulsar] branch branch-1.22 updated: Release 1.22.1-incubating
This is an automated email from the ASF dual-hosted git repository. jai1 pushed a commit to branch branch-1.22 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-1.22 by this push: new 741174e Release 1.22.1-incubating 741174e is described below commit 741174e6b0a46592a46fe2944ef292f08bffa347 Author: jai1AuthorDate: Wed May 2 13:29:23 2018 -0700 Release 1.22.1-incubating --- all/pom.xml | 2 +- buildtools/pom.xml | 2 +- managed-ledger/pom.xml | 2 +- pom.xml | 2 +- pulsar-broker-auth-athenz/pom.xml| 2 +- pulsar-broker-common/pom.xml | 2 +- pulsar-broker-shaded/pom.xml | 2 +- pulsar-broker/pom.xml| 2 +- pulsar-checksum/pom.xml | 2 +- pulsar-client-admin-shaded/pom.xml | 2 +- pulsar-client-admin/pom.xml | 2 +- pulsar-client-auth-athenz/pom.xml| 2 +- pulsar-client-kafka-compat/pom.xml | 2 +- pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml | 2 +- pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml | 2 +- pulsar-client-shaded/pom.xml | 2 +- pulsar-client-tools/pom.xml | 2 +- pulsar-client/pom.xml| 2 +- pulsar-common/pom.xml| 2 +- pulsar-discovery-service/pom.xml | 2 +- pulsar-proxy/pom.xml | 2 +- pulsar-spark/pom.xml | 2 +- pulsar-storm/pom.xml | 2 +- pulsar-testclient/pom.xml| 2 +- pulsar-websocket/pom.xml | 2 +- pulsar-zookeeper-utils/pom.xml | 2 +- pulsar-zookeeper/pom.xml | 2 +- 27 files changed, 27 insertions(+), 27 deletions(-) diff --git a/all/pom.xml b/all/pom.xml index 4283f55..5f7ae07 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating +1.22.1-incubating .. diff --git a/buildtools/pom.xml b/buildtools/pom.xml index 11a3bd9..2e42b9b 100644 --- a/buildtools/pom.xml +++ b/buildtools/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating +1.22.1-incubating .. diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index 0cdff4a..f4bf035 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating +1.22.1-incubating .. diff --git a/pom.xml b/pom.xml index daa6c5a..16b1eca 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ org.apache.pulsar pulsar - 1.22.0-incubating + 1.22.1-incubating Pulsar Pulsar is a distributed pub-sub messaging platform with a very diff --git a/pulsar-broker-auth-athenz/pom.xml b/pulsar-broker-auth-athenz/pom.xml index 0e5f6b1..3a24bd2 100644 --- a/pulsar-broker-auth-athenz/pom.xml +++ b/pulsar-broker-auth-athenz/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating +1.22.1-incubating pulsar-broker-auth-athenz diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index b65323f..940668a 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating +1.22.1-incubating pulsar-broker-common diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml index 6d552bf..efc43c5 100644 --- a/pulsar-broker-shaded/pom.xml +++ b/pulsar-broker-shaded/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating +1.22.1-incubating .. diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index f5c6528..868e1f4 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating +1.22.1-incubating .. diff --git a/pulsar-checksum/pom.xml b/pulsar-checksum/pom.xml index ff0a63a..837286b 100644 --- a/pulsar-checksum/pom.xml +++ b/pulsar-checksum/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating +1.22.1-incubating .. diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index f72243f..c3c3928 100644 ---
[GitHub] lucperkins opened a new pull request #1721: Version links in release notes
lucperkins opened a new pull request #1721: Version links in release notes URL: https://github.com/apache/incubator-pulsar/pull/1721 This enables people to easily link to release notes for specific versions. In the future, this page should be templated and built from source files rather than raw Markdown, but for now this provides an easy stopgap solution. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jai1 closed pull request #1715: Broker should not start replicator for root partitioned-topic
jai1 closed pull request #1715: Broker should not start replicator for root partitioned-topic URL: https://github.com/apache/incubator-pulsar/pull/1715 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 1913dd5ff3..d5480ccc8e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -18,17 +18,22 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.web.PulsarWebResource.path; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.DestinationName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,8 +62,9 @@ Stopped, Starting, Started, Stopping } -public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, -String remoteCluster, BrokerService brokerService) { +public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster, +BrokerService brokerService) throws NamingException { +validatePartitionedTopic(topicName, brokerService); this.brokerService = brokerService; this.topicName = topicName; this.replicatorPrefix = replicatorPrefix; @@ -67,7 +73,6 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster); this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); - this.producerConfiguration = new ProducerConfiguration(); this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS); this.producerConfiguration.setMaxPendingMessages(producerQueueSize); @@ -214,5 +219,42 @@ public static String getReplicatorName(String replicatorPrefix, String cluster) return (replicatorPrefix + "." + cluster).intern(); } +/** + * Replication can't be started on root-partitioned-topic to avoid producer startup conflict. + * + * + * eg: + * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then + * broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2". + * + * However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual + * producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing + * replicator producers. + * + * + * Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned + * producers. + * + * @param topicName + * @param brokerService + */ +private void validatePartitionedTopic(String topicName, BrokerService brokerService) throws NamingException { +DestinationName destination = DestinationName.get(topicName); +String partitionedTopicPath = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, +destination.getNamespace().toString(), destination.getDomain().toString(), +destination.getEncodedLocalName()); +boolean isPartitionedTopic = false; +try { +isPartitionedTopic = brokerService.pulsar().getConfigurationCache().policiesCache() +.get(partitionedTopicPath).isPresent(); +} catch (Exception e) { +log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage()); +} +if (isPartitionedTopic) { +throw new NamingException( +topicName + " is a partitioned-topic and replication can't be started for partitioned-producer "); +} +} + private
[incubator-pulsar] branch branch-1.22 updated: Broker should not start replicator for root partitioned-topic (#1715)
This is an automated email from the ASF dual-hosted git repository. jai1 pushed a commit to branch branch-1.22 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-1.22 by this push: new ca7559b Broker should not start replicator for root partitioned-topic (#1715) ca7559b is described below commit ca7559bd4e54eff3b3be8ba01263805da162bee0 Author: Jai AsherAuthorDate: Wed May 2 13:15:24 2018 -0700 Broker should not start replicator for root partitioned-topic (#1715) --- .../pulsar/broker/service/AbstractReplicator.java | 48 ++- .../pulsar/broker/service/BrokerService.java | 53 +--- .../nonpersistent/NonPersistentReplicator.java | 3 +- .../service/nonpersistent/NonPersistentTopic.java | 43 - .../service/persistent/PersistentReplicator.java | 3 +- .../broker/service/persistent/PersistentTopic.java | 39 ++-- .../pulsar/broker/service/ReplicatorTest.java | 72 ++ .../pulsar/broker/service/ReplicatorTestBase.java | 3 + 8 files changed, 216 insertions(+), 48 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 1913dd5..d5480cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -18,17 +18,22 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.web.PulsarWebResource.path; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.DestinationName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,8 +62,9 @@ public abstract class AbstractReplicator { Stopped, Starting, Started, Stopping } -public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, -String remoteCluster, BrokerService brokerService) { +public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster, +BrokerService brokerService) throws NamingException { +validatePartitionedTopic(topicName, brokerService); this.brokerService = brokerService; this.topicName = topicName; this.replicatorPrefix = replicatorPrefix; @@ -67,7 +73,6 @@ public abstract class AbstractReplicator { this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster); this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); - this.producerConfiguration = new ProducerConfiguration(); this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS); this.producerConfiguration.setMaxPendingMessages(producerQueueSize); @@ -214,5 +219,42 @@ public abstract class AbstractReplicator { return (replicatorPrefix + "." + cluster).intern(); } +/** + * Replication can't be started on root-partitioned-topic to avoid producer startup conflict. + * + * + * eg: + * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then + * broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2". + * + * However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual + * producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing + * replicator producers. + * + * + * Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned + * producers. + * + * @param topicName + * @param brokerService + */ +private void validatePartitionedTopic(String topicName, BrokerService brokerService) throws NamingException { +DestinationName destination = DestinationName.get(topicName); +String partitionedTopicPath = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, +
[incubator-pulsar] branch branch-1.22 updated (1875841 -> 5484585)
This is an automated email from the ASF dual-hosted git repository. jai1 pushed a change to branch branch-1.22 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git. from 1875841 Fix: handle invalid markDelete position at managed-cursor (#1718) add 5484585 Fix: deadlock while closing non-shared consumer (#1716) No new revisions were added by this update. Summary of changes: .../pulsar/broker/service/persistent/PersistentSubscription.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- To stop receiving notification emails like this one, please contact j...@apache.org.
[GitHub] jai1 closed pull request #1716: Fix: deadlock while closing non-shared consumer
jai1 closed pull request #1716: Fix: deadlock while closing non-shared consumer URL: https://github.com/apache/incubator-pulsar/pull/1716 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index e742f148cb..9ed7f74902 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -153,8 +153,12 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE if (!cursor.isDurable()) { // If cursor is not durable, we need to clean up the subscription as well close(); -topic.removeSubscription(subName); -} + +// when topic closes: it iterates through concurrent-subscription map to close each subscription. so, +// topic.remove again try to access same map which creates deadlock. so, execute it in different thread. +topic.getBrokerService().pulsar().getExecutor().submit(() ->{ +topic.removeSubscription(subName); +});} } // invalid consumer remove will throw an exception This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jai1 closed pull request #1718: Fix: handle invalid markDelete position at managed-cursor
jai1 closed pull request #1718: Fix: handle invalid markDelete position at managed-cursor URL: https://github.com/apache/incubator-pulsar/pull/1718 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 22cdf3d0f4..81727841b6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1291,7 +1291,7 @@ public void asyncMarkDelete(final Position position, Mappropertie final MarkDeleteCallback callback, final Object ctx) { checkNotNull(position); checkArgument(position instanceof PositionImpl); - + if (STATE_UPDATER.get(this) == State.Closed) { callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; @@ -1312,6 +1312,16 @@ public void asyncMarkDelete(final Position position, Map propertie log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position); } PositionImpl newPosition = (PositionImpl) position; + +if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) { +if (log.isDebugEnabled()) { +log.debug( +"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]", +ledger.getName(), position, ledger.getLastConfirmedEntry(), name); +} +callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); +return; +} lock.writeLock().lock(); try { @@ -1509,6 +1519,16 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba ledger.getName(), name, pos, individualDeletedMessages, markDeletePosition, previousPosition); } +if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) { +if (log.isDebugEnabled()) { +log.debug( +"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]", +ledger.getName(), position, ledger.getLastConfirmedEntry(), name); +} +callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); +return; +} + if (individualDeletedMessages.contains(position) || position.compareTo(markDeletePosition) <= 0) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 61e9b3da78..59e159456c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -49,6 +49,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; @@ -2614,5 +2615,60 @@ public void operationFailed(MetaStoreException e) { assertEquals(entries.size(), totalAddEntries / 2); } +@Test +public void testInvalidMarkDelete() throws Exception { +ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); + +ManagedCursor cursor = ledger.openCursor("c1"); +Position readPosition = cursor.getReadPosition(); +Position markDeletePosition = cursor.getMarkDeletedPosition(); + +List addedPositions = new ArrayList<>(); +for (int i = 0; i < 20; i++) { +Position p = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding)); +addedPositions.add(p); +} + +// validate: cursor.asyncMarkDelete(..) +CountDownLatch markDeleteCallbackLatch = new CountDownLatch(1); +
[incubator-pulsar] branch branch-1.22 updated: Fix: handle invalid markDelete position at managed-cursor (#1718)
This is an automated email from the ASF dual-hosted git repository. jai1 pushed a commit to branch branch-1.22 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-1.22 by this push: new 1875841 Fix: handle invalid markDelete position at managed-cursor (#1718) 1875841 is described below commit 187584155bcacec0900e8f6203a5f81a1e60ec7d Author: Jai AsherAuthorDate: Wed May 2 13:14:48 2018 -0700 Fix: handle invalid markDelete position at managed-cursor (#1718) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 - .../bookkeeper/mledger/impl/ManagedCursorTest.java | 56 ++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 22cdf3d..8172784 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1291,7 +1291,7 @@ public class ManagedCursorImpl implements ManagedCursor { final MarkDeleteCallback callback, final Object ctx) { checkNotNull(position); checkArgument(position instanceof PositionImpl); - + if (STATE_UPDATER.get(this) == State.Closed) { callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; @@ -1312,6 +1312,16 @@ public class ManagedCursorImpl implements ManagedCursor { log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position); } PositionImpl newPosition = (PositionImpl) position; + +if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) { +if (log.isDebugEnabled()) { +log.debug( +"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]", +ledger.getName(), position, ledger.getLastConfirmedEntry(), name); +} +callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); +return; +} lock.writeLock().lock(); try { @@ -1509,6 +1519,16 @@ public class ManagedCursorImpl implements ManagedCursor { ledger.getName(), name, pos, individualDeletedMessages, markDeletePosition, previousPosition); } +if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) { +if (log.isDebugEnabled()) { +log.debug( +"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]", +ledger.getName(), position, ledger.getLastConfirmedEntry(), name); +} +callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); +return; +} + if (individualDeletedMessages.contains(position) || position.compareTo(markDeletePosition) <= 0) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 61e9b3d..59e1594 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -49,6 +49,7 @@ import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; @@ -2614,5 +2615,60 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(entries.size(), totalAddEntries / 2); } +@Test +public void testInvalidMarkDelete() throws Exception { +ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); + +ManagedCursor cursor = ledger.openCursor("c1"); +Position readPosition = cursor.getReadPosition(); +Position markDeletePosition = cursor.getMarkDeletedPosition(); + +List
[GitHub] lucperkins opened a new pull request #1720: Fix broken image
lucperkins opened a new pull request #1720: Fix broken image URL: https://github.com/apache/incubator-pulsar/pull/1720 An image in the Concepts and Architecture doc wasn't properly carried over This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1717: Fix website sidebar
merlimat closed pull request #1717: Fix website sidebar URL: https://github.com/apache/incubator-pulsar/pull/1717 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site/Makefile b/site/Makefile index 9fcd9d6a18..3694bdf6de 100644 --- a/site/Makefile +++ b/site/Makefile @@ -63,7 +63,7 @@ travis_build: javadoc travis_publish: scripts/publish-website.sh -build: api_docs +build: api_docs swagger_docs_build scripts/build-all-versions.sh serve: clean_local @@ -86,7 +86,7 @@ swagger_definition_copy: (cd $(shell git rev-parse --show-toplevel) && \ cp pulsar-broker/target/docs/swagger.json site/_data/admin-rest-api-swagger.json) -swagger_docs_update: swagger_definition_gen swagger_definition_copy +swagger_docs_build: swagger_definition_gen swagger_definition_copy protobuf_doc_gen: scripts/protobuf-doc-gen.sh @@ -96,4 +96,4 @@ protobuf_setup: api_docs: javadoc python_doc_gen cpp_doc_gen -publish: deep_clean setup swagger_docs_update build +publish: deep_clean setup build diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml index c145a18093..9f701bcc5c 100644 --- a/site/_data/sidebar.yaml +++ b/site/_data/sidebar.yaml @@ -82,7 +82,7 @@ groups: endpoint: Dashboard - title: Pulsar statistics endpoint: Stats - - Load distribution + - title: Load distribution endpoint: LoadDistribution - title: Pulsar proxy endpoint: Proxy This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Fix website sidebar (#1717)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8056df5 Fix website sidebar (#1717) 8056df5 is described below commit 8056df573514e2a34224ee0cfe7385a9fd179589 Author: Luc PerkinsAuthorDate: Wed May 2 12:32:17 2018 -0700 Fix website sidebar (#1717) * use proper YAML syntax * add swagger docs build to make build command * rename make commands --- site/Makefile | 6 +++--- site/_data/sidebar.yaml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/site/Makefile b/site/Makefile index 9fcd9d6..3694bdf 100644 --- a/site/Makefile +++ b/site/Makefile @@ -63,7 +63,7 @@ travis_build: javadoc travis_publish: scripts/publish-website.sh -build: api_docs +build: api_docs swagger_docs_build scripts/build-all-versions.sh serve: clean_local @@ -86,7 +86,7 @@ swagger_definition_copy: (cd $(shell git rev-parse --show-toplevel) && \ cp pulsar-broker/target/docs/swagger.json site/_data/admin-rest-api-swagger.json) -swagger_docs_update: swagger_definition_gen swagger_definition_copy +swagger_docs_build: swagger_definition_gen swagger_definition_copy protobuf_doc_gen: scripts/protobuf-doc-gen.sh @@ -96,4 +96,4 @@ protobuf_setup: api_docs: javadoc python_doc_gen cpp_doc_gen -publish: deep_clean setup swagger_docs_update build +publish: deep_clean setup build diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml index c145a18..9f701bc 100644 --- a/site/_data/sidebar.yaml +++ b/site/_data/sidebar.yaml @@ -82,7 +82,7 @@ groups: endpoint: Dashboard - title: Pulsar statistics endpoint: Stats - - Load distribution + - title: Load distribution endpoint: LoadDistribution - title: Pulsar proxy endpoint: Proxy -- To stop receiving notification emails like this one, please contact mme...@apache.org.
[GitHub] jai1 opened a new pull request #1718: Fix: handle invalid markDelete position at managed-cursor
jai1 opened a new pull request #1718: Fix: handle invalid markDelete position at managed-cursor URL: https://github.com/apache/incubator-pulsar/pull/1718 Backporting #1554 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1717: Fix website sidebar
merlimat commented on issue #1717: Fix website sidebar URL: https://github.com/apache/incubator-pulsar/pull/1717#issuecomment-386085307 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface
jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185603311 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java ## @@ -29,7 +32,9 @@ * There is a default implementation provided for wrapping up the user provided {@link Sink}. Pulsar sink * should be implemented using this interface to ensure supporting effective-once. */ -public interface RuntimeSink extends Sink { +//public interface RuntimeSink extends Sink { Review comment: yup will remove This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface
jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185603124 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -18,5 +18,249 @@ */ package org.apache.pulsar.functions.sink; -public class PulsarSink { +import com.google.common.annotations.VisibleForTesting; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.connect.core.RecordContext; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.instance.InstanceUtils; +import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers; +import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers; +import org.apache.pulsar.functions.instance.producers.Producers; +import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.pulsar.functions.utils.FunctionConfig; + +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@Slf4j +public class PulsarSink implements RuntimeSink { + +private PulsarClient client; +private PulsarSinkConfig pulsarSinkConfig; +private SerDe outputSerDe; + +private PulsarSinkProcessor pulsarSinkProcessor; + +private interface PulsarSinkProcessor { +void initializeOutputProducer(String outputTopic) throws Exception; + +void sendOutputMessage(MessageBuilder outputMsgBuilder, + PulsarRecord pulsarRecord) throws Exception; + +void close() throws Exception; +} + +private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { +private Producerproducer; + +@Override +public void initializeOutputProducer(String outputTopic) throws Exception { +this.producer = AbstractOneOuputTopicProducers.createProducer( +client, pulsarSinkConfig.getTopic()); +} + +@Override +public void sendOutputMessage(MessageBuilder outputMsgBuilder, + PulsarRecord pulsarRecord) throws Exception { +Message outputMsg = outputMsgBuilder.build(); +this.producer.sendAsync(outputMsg); +} + +@Override +public void close() throws Exception { +if (null != producer) { +try { +producer.close(); +} catch (PulsarClientException e) { +log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e); +} +} +} +} + +private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor { +private Producer producer; + +@Override +public void initializeOutputProducer(String outputTopic) throws Exception { +this.producer = AbstractOneOuputTopicProducers.createProducer( +client, pulsarSinkConfig.getTopic()); +} + +@Override +public void sendOutputMessage(MessageBuilder outputMsgBuilder, + PulsarRecord pulsarRecord) throws Exception { +Message outputMsg = outputMsgBuilder.build(); +this.producer.sendAsync(outputMsg).thenAccept(messageId -> pulsarRecord.ack()); +} + +@Override +public void close() throws Exception { +if (null != producer) { +try { +producer.close(); +} catch (PulsarClientException e) { +log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e); +} +} +} +} + +private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor, ConsumerEventListener { + +@Getter(AccessLevel.PACKAGE) +protected Producers outputProducer; + +@Override +public void initializeOutputProducer(String outputTopic) throws Exception { +outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic); +outputProducer.initialize(); +} + +@Override +public void sendOutputMessage(MessageBuilder outputMsgBuilder, PulsarRecord pulsarRecord) +throws Exception { + +//
[GitHub] jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface
jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185602904 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -18,5 +18,249 @@ */ package org.apache.pulsar.functions.sink; -public class PulsarSink { +import com.google.common.annotations.VisibleForTesting; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.connect.core.RecordContext; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.instance.InstanceUtils; +import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers; +import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers; +import org.apache.pulsar.functions.instance.producers.Producers; +import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.pulsar.functions.utils.FunctionConfig; + +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@Slf4j +public class PulsarSink implements RuntimeSink { + +private PulsarClient client; +private PulsarSinkConfig pulsarSinkConfig; +private SerDe outputSerDe; + +private PulsarSinkProcessor pulsarSinkProcessor; + +private interface PulsarSinkProcessor { +void initializeOutputProducer(String outputTopic) throws Exception; + +void sendOutputMessage(MessageBuilder outputMsgBuilder, + PulsarRecord pulsarRecord) throws Exception; + +void close() throws Exception; +} + +private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { +private Producerproducer; + +@Override +public void initializeOutputProducer(String outputTopic) throws Exception { +this.producer = AbstractOneOuputTopicProducers.createProducer( +client, pulsarSinkConfig.getTopic()); +} + +@Override +public void sendOutputMessage(MessageBuilder outputMsgBuilder, + PulsarRecord pulsarRecord) throws Exception { +Message outputMsg = outputMsgBuilder.build(); +this.producer.sendAsync(outputMsg); +} + +@Override +public void close() throws Exception { +if (null != producer) { +try { +producer.close(); +} catch (PulsarClientException e) { +log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e); +} +} +} +} + +private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor { Review comment: this cannot be static. I am using the PulsarClient from the parent class This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface
jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185602874 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -18,5 +18,249 @@ */ package org.apache.pulsar.functions.sink; -public class PulsarSink { +import com.google.common.annotations.VisibleForTesting; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.connect.core.RecordContext; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.instance.InstanceUtils; +import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers; +import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers; +import org.apache.pulsar.functions.instance.producers.Producers; +import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.pulsar.functions.utils.FunctionConfig; + +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@Slf4j +public class PulsarSink implements RuntimeSink { + +private PulsarClient client; +private PulsarSinkConfig pulsarSinkConfig; +private SerDe outputSerDe; + +private PulsarSinkProcessor pulsarSinkProcessor; + +private interface PulsarSinkProcessor { +void initializeOutputProducer(String outputTopic) throws Exception; + +void sendOutputMessage(MessageBuilder outputMsgBuilder, + PulsarRecord pulsarRecord) throws Exception; + +void close() throws Exception; +} + +private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { Review comment: this cannot be static. I am using the PulsarClient from the parent class This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface
jerrypeng commented on a change in pull request #1708: Refactor functions to use Sink interface URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185602541 ## File path: pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java ## @@ -45,5 +45,5 @@ * @param value output value * @return Completable future fo async publish request */ -CompletableFuture write(T value); -} \ No newline at end of file +CompletableFuture write(T value) throws Exception; Review comment: I agree. I will change This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on issue #1717: Fix website sidebar
lucperkins commented on issue #1717: Fix website sidebar URL: https://github.com/apache/incubator-pulsar/pull/1717#issuecomment-386073972 @merlimat I made some additional changes that will fix the issue with the REST API docs not being generated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins opened a new pull request #1717: Fix website sidebar
lucperkins opened a new pull request #1717: Fix website sidebar URL: https://github.com/apache/incubator-pulsar/pull/1717 The website build is now broken from a previous PR. This changes restores proper YAML syntax in the sidebar definition. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185578889 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java ## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.s3offload; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; +import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; + +import com.google.common.io.ByteStreams; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.DataBlockHeader; +import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; +import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +@Slf4j +public class BlockAwareSegmentInputStreamTest { +@Data +class MockLedgerEntry implements LedgerEntry { +public byte blockPadding = 0xB; +long ledgerId; +long entryId; +long length; +byte entryBytes[]; +ByteBuf entryBuffer; + +MockLedgerEntry(long ledgerId, long entryId, long length) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.length = length; +this.entryBytes = new byte[(int)length]; +entryBuffer = Unpooled.wrappedBuffer(entryBytes); +entryBuffer.writerIndex(0); +IntStream.range(0, (int)length).forEach(i -> entryBuffer.writeByte(blockPadding)); +} + +@Override +public ByteBuffer getEntryNioBuffer() { +return null; +} + +@Override +public LedgerEntry duplicate() { +return null; +} + +@Override +public void close() { +entryBuffer.release(); +} +} + +@Data +class MockLedgerEntries implements LedgerEntries { +int ledgerId; +int startEntryId; +int count; +int entrySize; +List entries; + +MockLedgerEntries(int ledgerId, int startEntryId, int count, int entrySize) { +this.ledgerId = ledgerId; +this.startEntryId = startEntryId; +this.count = count; +this.entrySize = entrySize; +this.entries = Lists.newArrayList(count); + +IntStream.range(startEntryId, startEntryId + count).forEach(i -> +entries.add(new MockLedgerEntry(ledgerId, i, entrySize))); +} + +@Override +public void close() { +entries.clear(); +} + +@Override +public LedgerEntry getEntry(long entryId) { +if (entryId < startEntryId || entryId >= startEntryId + count) { +return null; +} + +return entries.get(((int)entryId - startEntryId)); +} + +@Override +public Iterator iterator() { +return entries.iterator(); +} +} + +class MockReadHandle implements ReadHandle { +int ledgerId; +int entrySize; +int lac; +MockReadHandle(int ledgerId, int entrySize, int lac) { +this.ledgerId = ledgerId; +this.entrySize = entrySize; +this.lac = lac; +
[incubator-pulsar] branch master updated: Add release notes page in website (#1705)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4fff8d6 Add release notes page in website (#1705) 4fff8d6 is described below commit 4fff8d6827544772ed38c0c6f5dcc09a3f7268e4 Author: Matteo MerliAuthorDate: Wed May 2 10:35:44 2018 -0700 Add release notes page in website (#1705) --- site/download.md | 4 +- site/release-notes.md | 363 ++ 2 files changed, 365 insertions(+), 2 deletions(-) diff --git a/site/download.md b/site/download.md index 9aab82f..94d45e8 100644 --- a/site/download.md +++ b/site/download.md @@ -46,9 +46,9 @@ Pulsar's Release Managers. We also provide `MD5` and `SHA-512` checksums for eve After you download the file, you should calculate a checksum for your download, and make sure it is the same as ours. -### Release notes for the {{ site.current_version }} release +### Release notes -[https://github.com/apache/incubator-pulsar/releases/tag/v{{site.current_version}}](https://github.com/apache/incubator-pulsar/releases/tag/v{{site.current_version}}) +[Release notes](../release-notes) for all Pulsar's versions ### Getting started diff --git a/site/release-notes.md b/site/release-notes.md new file mode 100644 index 000..629851a --- /dev/null +++ b/site/release-notes.md @@ -0,0 +1,363 @@ +--- +title: Apache Pulsar Release Notes +layout: content +--- + + + +## Apache incubator + +### 1.22.0-incubating 2018-03-06 + +This is the fourth of Apache Pulsar since entering the ASF incubator. + +Major changes in this release include: + + Features + * [#896](https://github.com/apache/incubator-pulsar/pull/896) PIP-7 Introduce Failure-domain and Anti-affinity-namespace group + * [#1031](https://github.com/apache/incubator-pulsar/pull/1031) Add optional key/value metadata to producers/consumers + * [#1129](https://github.com/apache/incubator-pulsar/pull/1129) Added end to end encryption in C++ client + * [#1151](https://github.com/apache/incubator-pulsar/pull/1151) Added REST handler to create a subscription on a topic + * [#1087](https://github.com/apache/incubator-pulsar/pull/1087) Add basic authentication plugin + * [#1200](https://github.com/apache/incubator-pulsar/pull/1200) Add pluggable authorization mechanism + * [#1208](https://github.com/apache/incubator-pulsar/pull/1208) Add hostname-verification at client tls connection + * [#950](https://github.com/apache/incubator-pulsar/pull/950) Provided an DCOS Universe package for pulsar + * [#1046](https://github.com/apache/incubator-pulsar/pull/1046) Introduce config to skip non-recoverable data-ledger + * [#899](https://github.com/apache/incubator-pulsar/pull/899) Add subscription auth mode by prefix + * [#1135](https://github.com/apache/incubator-pulsar/pull/1135) Added infinite time retention configuration option + + Enhancements + + * [#1094](https://github.com/apache/incubator-pulsar/pull/1094) Include BoringSSL native implementation for faster TLS + * [#1204](https://github.com/apache/incubator-pulsar/pull/1204) Reduce size of buffer used to assemble batches + * [#930](https://github.com/apache/incubator-pulsar/pull/930) Perform async DNS resolution + * [#1124](https://github.com/apache/incubator-pulsar/pull/1124) Support Pulsar proxy from C++/Python client library + * [#1012](https://github.com/apache/incubator-pulsar/pull/1012) Made load shedding for load manager Dynamically configurable + * [#962](https://github.com/apache/incubator-pulsar/pull/962) Raw Reader for Pulsar Topics + * [#941](https://github.com/apache/incubator-pulsar/pull/941) Upgraded Jackson version + * [#1002](https://github.com/apache/incubator-pulsar/pull/1002), [#1169](https://github.com/apache/incubator-pulsar/pull/1169), [#1168](https://github.com/apache/incubator-pulsar/pull/1168) Making Pulsar Proxy more secure + * [#1029](https://github.com/apache/incubator-pulsar/pull/1029) Fix MessageRouter hash inconsistent on C++/Java client + + Fixes + + * [#1153](https://github.com/apache/incubator-pulsar/pull/1153) Fixed increase partitions on a partitioned topic + * [#1195](https://github.com/apache/incubator-pulsar/pull/1195) Ensure the checksum is not stripped after validation in the broker + * [#1203](https://github.com/apache/incubator-pulsar/pull/1203) Use duplicates when writing from ByteBuf pair to avoid multiple threads issues + * [#1210](https://github.com/apache/incubator-pulsar/pull/1210) Cancel keep-alive timer task after the proxy switch to TCP proxy + * [#1170](https://github.com/apache/incubator-pulsar/pull/1170) Upgrade BK version: BK-4.3.1.91-yahoo (fix: stats + DoubleByteBuf) + * [#875](https://github.com/apache/incubator-pulsar/pull/875) Bug fixes for Websocket proxy + +The complete list of changes can be
[GitHub] merlimat closed pull request #1705: Add release notes page in website
merlimat closed pull request #1705: Add release notes page in website URL: https://github.com/apache/incubator-pulsar/pull/1705 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site/download.md b/site/download.md index 9aab82fea0..94d45e837d 100644 --- a/site/download.md +++ b/site/download.md @@ -46,9 +46,9 @@ Pulsar's Release Managers. We also provide `MD5` and `SHA-512` checksums for eve After you download the file, you should calculate a checksum for your download, and make sure it is the same as ours. -### Release notes for the {{ site.current_version }} release +### Release notes -[https://github.com/apache/incubator-pulsar/releases/tag/v{{site.current_version}}](https://github.com/apache/incubator-pulsar/releases/tag/v{{site.current_version}}) +[Release notes](../release-notes) for all Pulsar's versions ### Getting started diff --git a/site/release-notes.md b/site/release-notes.md new file mode 100644 index 00..629851a0b9 --- /dev/null +++ b/site/release-notes.md @@ -0,0 +1,363 @@ +--- +title: Apache Pulsar Release Notes +layout: content +--- + + + +## Apache incubator + +### 1.22.0-incubating 2018-03-06 + +This is the fourth of Apache Pulsar since entering the ASF incubator. + +Major changes in this release include: + + Features + * [#896](https://github.com/apache/incubator-pulsar/pull/896) PIP-7 Introduce Failure-domain and Anti-affinity-namespace group + * [#1031](https://github.com/apache/incubator-pulsar/pull/1031) Add optional key/value metadata to producers/consumers + * [#1129](https://github.com/apache/incubator-pulsar/pull/1129) Added end to end encryption in C++ client + * [#1151](https://github.com/apache/incubator-pulsar/pull/1151) Added REST handler to create a subscription on a topic + * [#1087](https://github.com/apache/incubator-pulsar/pull/1087) Add basic authentication plugin + * [#1200](https://github.com/apache/incubator-pulsar/pull/1200) Add pluggable authorization mechanism + * [#1208](https://github.com/apache/incubator-pulsar/pull/1208) Add hostname-verification at client tls connection + * [#950](https://github.com/apache/incubator-pulsar/pull/950) Provided an DCOS Universe package for pulsar + * [#1046](https://github.com/apache/incubator-pulsar/pull/1046) Introduce config to skip non-recoverable data-ledger + * [#899](https://github.com/apache/incubator-pulsar/pull/899) Add subscription auth mode by prefix + * [#1135](https://github.com/apache/incubator-pulsar/pull/1135) Added infinite time retention configuration option + + Enhancements + + * [#1094](https://github.com/apache/incubator-pulsar/pull/1094) Include BoringSSL native implementation for faster TLS + * [#1204](https://github.com/apache/incubator-pulsar/pull/1204) Reduce size of buffer used to assemble batches + * [#930](https://github.com/apache/incubator-pulsar/pull/930) Perform async DNS resolution + * [#1124](https://github.com/apache/incubator-pulsar/pull/1124) Support Pulsar proxy from C++/Python client library + * [#1012](https://github.com/apache/incubator-pulsar/pull/1012) Made load shedding for load manager Dynamically configurable + * [#962](https://github.com/apache/incubator-pulsar/pull/962) Raw Reader for Pulsar Topics + * [#941](https://github.com/apache/incubator-pulsar/pull/941) Upgraded Jackson version + * [#1002](https://github.com/apache/incubator-pulsar/pull/1002), [#1169](https://github.com/apache/incubator-pulsar/pull/1169), [#1168](https://github.com/apache/incubator-pulsar/pull/1168) Making Pulsar Proxy more secure + * [#1029](https://github.com/apache/incubator-pulsar/pull/1029) Fix MessageRouter hash inconsistent on C++/Java client + + Fixes + + * [#1153](https://github.com/apache/incubator-pulsar/pull/1153) Fixed increase partitions on a partitioned topic + * [#1195](https://github.com/apache/incubator-pulsar/pull/1195) Ensure the checksum is not stripped after validation in the broker + * [#1203](https://github.com/apache/incubator-pulsar/pull/1203) Use duplicates when writing from ByteBuf pair to avoid multiple threads issues + * [#1210](https://github.com/apache/incubator-pulsar/pull/1210) Cancel keep-alive timer task after the proxy switch to TCP proxy + * [#1170](https://github.com/apache/incubator-pulsar/pull/1170) Upgrade BK version: BK-4.3.1.91-yahoo (fix: stats + DoubleByteBuf) + * [#875](https://github.com/apache/incubator-pulsar/pull/875) Bug fixes for Websocket proxy + +The complete list of changes can be found at: +https://github.com/apache/incubator-pulsar/milestone/11?closed=1 + +https://github.com/apache/incubator-pulsar/releases/tag/v1.22.0-incubating + +### 1.21.0-incubating 2017-12-17 + +This is the third of Apache Pulsar since entering the ASF incubator. + +Major
[GitHub] sijie closed pull request #1687: Don't offload empty ledgers
sijie closed pull request #1687: Don't offload empty ledgers URL: https://github.com/apache/incubator-pulsar/pull/1687 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f1d71007a4..1a606b7d4a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1966,7 +1966,8 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct long firstLedgerRetained = current; for (LedgerInfo ls : ledgers.headMap(current).values()) { if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) { -if (!ls.getOffloadContext().getComplete()) { +// don't offload if ledger has already been offloaded, or is empty +if (!ls.getOffloadContext().getComplete() && ls.getSize() > 0) { ledgersToOffload.add(ls); } } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 278182cef3..76cbbb6de5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -40,6 +41,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.tuple.Pair; @@ -633,6 +635,47 @@ public void testOffloadDeleteIncomplete() throws Exception { assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger)); } +@Test +public void testDontOffloadEmpty() throws Exception { +MockLedgerOffloader offloader = new MockLedgerOffloader(); +ManagedLedgerConfig config = new ManagedLedgerConfig(); +config.setMaxEntriesPerLedger(10); +config.setMinimumRolloverTime(0, TimeUnit.SECONDS); +config.setRetentionTime(10, TimeUnit.MINUTES); +config.setLedgerOffloader(offloader); +ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + +int i = 0; +for (; i < 35; i++) { +String content = "entry-" + i; +ledger.addEntry(content.getBytes()); +} +Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4); + +long firstLedgerId = ledger.getLedgersInfoAsList().get(0).getLedgerId(); +long secondLedgerId = ledger.getLedgersInfoAsList().get(1).getLedgerId(); +long thirdLedgerId = ledger.getLedgersInfoAsList().get(2).getLedgerId(); +long fourthLedgerId = ledger.getLedgersInfoAsList().get(3).getLedgerId(); + +// make an ledger empty +Field ledgersField = ledger.getClass().getDeclaredField("ledgers"); +ledgersField.setAccessible(true); +Mapledgers = (Map )ledgersField.get(ledger); +ledgers.put(secondLedgerId, + ledgers.get(secondLedgerId).toBuilder().setEntries(0).setSize(0).build()); + +PositionImpl firstUnoffloaded = (PositionImpl)ledger.offloadPrefix(ledger.getLastConfirmedEntry()); +Assert.assertEquals(firstUnoffloaded.getLedgerId(), fourthLedgerId); +Assert.assertEquals(firstUnoffloaded.getEntryId(), 0); + +Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4); +Assert.assertEquals(ledger.getLedgersInfoAsList().stream() +.filter(e -> e.getOffloadContext().getComplete()) +.map(e -> e.getLedgerId()).collect(Collectors.toSet()), +offloader.offloadedLedgers()); +Assert.assertEquals(offloader.offloadedLedgers(), ImmutableSet.of(firstLedgerId, thirdLedgerId)); +} + static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception { // wait up to 3 seconds for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {
[incubator-pulsar] branch master updated: Don't offload empty ledgers (#1687)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new b982943 Don't offload empty ledgers (#1687) b982943 is described below commit b982943c77972211c937270d305d14f9d12d5f84 Author: Ivan KellyAuthorDate: Wed May 2 19:10:25 2018 +0200 Don't offload empty ledgers (#1687) It shouldn't be possible for a ledger in a managed ledger to be empty (it should be cleaned up on recovery), but this patch adds defensive code so that if they do exist for some reason, they won't be offloaded. Master Issue: #1511 --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +- .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 43 ++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f1d7100..1a606b7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1966,7 +1966,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { long firstLedgerRetained = current; for (LedgerInfo ls : ledgers.headMap(current).values()) { if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) { -if (!ls.getOffloadContext().getComplete()) { +// don't offload if ledger has already been offloaded, or is empty +if (!ls.getOffloadContext().getComplete() && ls.getSize() > 0) { ledgersToOffload.add(ls); } } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 278182c..76cbbb6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl; import com.google.common.collect.ImmutableSet; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -40,6 +41,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.tuple.Pair; @@ -633,6 +635,47 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger)); } +@Test +public void testDontOffloadEmpty() throws Exception { +MockLedgerOffloader offloader = new MockLedgerOffloader(); +ManagedLedgerConfig config = new ManagedLedgerConfig(); +config.setMaxEntriesPerLedger(10); +config.setMinimumRolloverTime(0, TimeUnit.SECONDS); +config.setRetentionTime(10, TimeUnit.MINUTES); +config.setLedgerOffloader(offloader); +ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + +int i = 0; +for (; i < 35; i++) { +String content = "entry-" + i; +ledger.addEntry(content.getBytes()); +} +Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4); + +long firstLedgerId = ledger.getLedgersInfoAsList().get(0).getLedgerId(); +long secondLedgerId = ledger.getLedgersInfoAsList().get(1).getLedgerId(); +long thirdLedgerId = ledger.getLedgersInfoAsList().get(2).getLedgerId(); +long fourthLedgerId = ledger.getLedgersInfoAsList().get(3).getLedgerId(); + +// make an ledger empty +Field ledgersField = ledger.getClass().getDeclaredField("ledgers"); +ledgersField.setAccessible(true); +Map ledgers = (Map )ledgersField.get(ledger); +ledgers.put(secondLedgerId, + ledgers.get(secondLedgerId).toBuilder().setEntries(0).setSize(0).build()); + +PositionImpl firstUnoffloaded = (PositionImpl)ledger.offloadPrefix(ledger.getLastConfirmedEntry()); +Assert.assertEquals(firstUnoffloaded.getLedgerId(), fourthLedgerId); +Assert.assertEquals(firstUnoffloaded.getEntryId(), 0); + +
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185564229 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java ## @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The BlockAwareSegmentInputStreamImpl for each cold storage data block. + * It gets data from ledger, and will be read out the content for a data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + padding + */ +public class BlockAwareSegmentInputStreamImpl extends InputStream implements BlockAwareSegmentInputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class); + +private static final byte[] BLOCK_END_PADDING = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int ENTRIES_PER_READ = 100; +// buf the entry size and entry id. +private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < blockSize); + +// once reach the end of entry buffer, start a new read. +if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) { Review comment: it could be simplifier more if entriesByteBuf cleanup is left until close. ``` if (entriesByteBuf.isEmpty()) { entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); } if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) { // not able to place a new Entry. dataBlockFullOffset = bytesReadOffset; return BLOCK_END_PADDING[(bytesReadOffset++ - dataBlockFullOffset) % 4]; } else { ... ``` This is an automated message from the Apache Git
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185561252 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java ## @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The BlockAwareSegmentInputStreamImpl for each cold storage data block. + * It gets data from ledger, and will be read out the content for a data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + padding + */ +public class BlockAwareSegmentInputStreamImpl extends InputStream implements BlockAwareSegmentInputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class); + +private static final byte[] BLOCK_END_PADDING = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int ENTRIES_PER_READ = 100; +// buf the entry size and entry id. +private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < blockSize); + +// once reach the end of entry buffer, start a new read. +if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) { +entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); +if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) { +// not able to place a new Entry. +entriesByteBuf.forEach(buf -> buf.release()); +entriesByteBuf.clear(); +dataBlockFullOffset = bytesReadOffset; +} +} + +if (bytesReadOffset < dataBlockFullOffset) { +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf =
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185566416 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java ## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.s3offload; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; +import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; + +import com.google.common.io.ByteStreams; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.DataBlockHeader; +import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; +import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +@Slf4j +public class BlockAwareSegmentInputStreamTest { +@Data +class MockLedgerEntry implements LedgerEntry { +public byte blockPadding = 0xB; +long ledgerId; +long entryId; +long length; +byte entryBytes[]; +ByteBuf entryBuffer; + +MockLedgerEntry(long ledgerId, long entryId, long length) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.length = length; +this.entryBytes = new byte[(int)length]; +entryBuffer = Unpooled.wrappedBuffer(entryBytes); +entryBuffer.writerIndex(0); +IntStream.range(0, (int)length).forEach(i -> entryBuffer.writeByte(blockPadding)); +} + +@Override +public ByteBuffer getEntryNioBuffer() { +return null; +} + +@Override +public LedgerEntry duplicate() { +return null; +} + +@Override +public void close() { +entryBuffer.release(); +} +} + +@Data +class MockLedgerEntries implements LedgerEntries { +int ledgerId; +int startEntryId; +int count; +int entrySize; +List entries; + +MockLedgerEntries(int ledgerId, int startEntryId, int count, int entrySize) { +this.ledgerId = ledgerId; +this.startEntryId = startEntryId; +this.count = count; +this.entrySize = entrySize; +this.entries = Lists.newArrayList(count); + +IntStream.range(startEntryId, startEntryId + count).forEach(i -> +entries.add(new MockLedgerEntry(ledgerId, i, entrySize))); +} + +@Override +public void close() { +entries.clear(); +} + +@Override +public LedgerEntry getEntry(long entryId) { +if (entryId < startEntryId || entryId >= startEntryId + count) { +return null; +} + +return entries.get(((int)entryId - startEntryId)); +} + +@Override +public Iterator iterator() { +return entries.iterator(); +} +} + +class MockReadHandle implements ReadHandle { +int ledgerId; +int entrySize; +int lac; +MockReadHandle(int ledgerId, int entrySize, int lac) { +this.ledgerId = ledgerId; +this.entrySize = entrySize; +this.lac = lac; +
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185568883 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java ## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.s3offload; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; +import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; + +import com.google.common.io.ByteStreams; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.DataBlockHeader; +import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; +import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +@Slf4j +public class BlockAwareSegmentInputStreamTest { +@Data +class MockLedgerEntry implements LedgerEntry { +public byte blockPadding = 0xB; +long ledgerId; +long entryId; +long length; +byte entryBytes[]; +ByteBuf entryBuffer; + +MockLedgerEntry(long ledgerId, long entryId, long length) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.length = length; +this.entryBytes = new byte[(int)length]; +entryBuffer = Unpooled.wrappedBuffer(entryBytes); +entryBuffer.writerIndex(0); +IntStream.range(0, (int)length).forEach(i -> entryBuffer.writeByte(blockPadding)); +} + +@Override +public ByteBuffer getEntryNioBuffer() { +return null; +} + +@Override +public LedgerEntry duplicate() { +return null; +} + +@Override +public void close() { +entryBuffer.release(); +} +} + +@Data +class MockLedgerEntries implements LedgerEntries { +int ledgerId; +int startEntryId; +int count; +int entrySize; +List entries; + +MockLedgerEntries(int ledgerId, int startEntryId, int count, int entrySize) { +this.ledgerId = ledgerId; +this.startEntryId = startEntryId; +this.count = count; +this.entrySize = entrySize; +this.entries = Lists.newArrayList(count); + +IntStream.range(startEntryId, startEntryId + count).forEach(i -> +entries.add(new MockLedgerEntry(ledgerId, i, entrySize))); +} + +@Override +public void close() { +entries.clear(); +} + +@Override +public LedgerEntry getEntry(long entryId) { +if (entryId < startEntryId || entryId >= startEntryId + count) { +return null; +} + +return entries.get(((int)entryId - startEntryId)); +} + +@Override +public Iterator iterator() { +return entries.iterator(); +} +} + +class MockReadHandle implements ReadHandle { +int ledgerId; +int entrySize; +int lac; +MockReadHandle(int ledgerId, int entrySize, int lac) { +this.ledgerId = ledgerId; +this.entrySize = entrySize; +this.lac = lac; +
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185560570 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java ## @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The BlockAwareSegmentInputStreamImpl for each cold storage data block. + * It gets data from ledger, and will be read out the content for a data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + padding + */ +public class BlockAwareSegmentInputStreamImpl extends InputStream implements BlockAwareSegmentInputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class); + +private static final byte[] BLOCK_END_PADDING = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int ENTRIES_PER_READ = 100; +// buf the entry size and entry id. +private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < blockSize); + +// once reach the end of entry buffer, start a new read. +if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) { +entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); +if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) { +// not able to place a new Entry. +entriesByteBuf.forEach(buf -> buf.release()); +entriesByteBuf.clear(); +dataBlockFullOffset = bytesReadOffset; +} +} + +if (bytesReadOffset < dataBlockFullOffset) { +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf =
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185557470 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java ## @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The BlockAwareSegmentInputStreamImpl for each cold storage data block. + * It gets data from ledger, and will be read out the content for a data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + padding + */ +public class BlockAwareSegmentInputStreamImpl extends InputStream implements BlockAwareSegmentInputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class); + +private static final byte[] BLOCK_END_PADDING = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int ENTRIES_PER_READ = 100; +// buf the entry size and entry id. +private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < blockSize); + +// once reach the end of entry buffer, start a new read. +if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) { +entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); +if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) { +// not able to place a new Entry. +entriesByteBuf.forEach(buf -> buf.release()); +entriesByteBuf.clear(); +dataBlockFullOffset = bytesReadOffset; +} +} + +if (bytesReadOffset < dataBlockFullOffset) { +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf =
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185562881 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java ## @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The BlockAwareSegmentInputStreamImpl for each cold storage data block. + * It gets data from ledger, and will be read out the content for a data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + padding + */ +public class BlockAwareSegmentInputStreamImpl extends InputStream implements BlockAwareSegmentInputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class); + +private static final byte[] BLOCK_END_PADDING = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int ENTRIES_PER_READ = 100; +// buf the entry size and entry id. +private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < blockSize); + +// once reach the end of entry buffer, start a new read. +if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) { Review comment: moving the inner check out of the outer check will remove code duplication ``` if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) { entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); } if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) { // not able to place a new Entry. entriesByteBuf.forEach(buf -> buf.release()); entriesByteBuf.clear(); dataBlockFullOffset = bytesReadOffset; return BLOCK_END_PADDING[(bytesReadOffset++ - dataBlockFullOffset) % 4]; } else if
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185565049 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java ## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.s3offload; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; +import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; + +import com.google.common.io.ByteStreams; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.DataBlockHeader; +import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; +import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +@Slf4j +public class BlockAwareSegmentInputStreamTest { +@Data +class MockLedgerEntry implements LedgerEntry { +public byte blockPadding = 0xB; +long ledgerId; +long entryId; +long length; +byte entryBytes[]; +ByteBuf entryBuffer; + +MockLedgerEntry(long ledgerId, long entryId, long length) { +this.ledgerId = ledgerId; +this.entryId = entryId; +this.length = length; +this.entryBytes = new byte[(int)length]; +entryBuffer = Unpooled.wrappedBuffer(entryBytes); +entryBuffer.writerIndex(0); +IntStream.range(0, (int)length).forEach(i -> entryBuffer.writeByte(blockPadding)); +} + +@Override +public ByteBuffer getEntryNioBuffer() { +return null; +} + +@Override +public LedgerEntry duplicate() { +return null; +} + +@Override +public void close() { +entryBuffer.release(); +} +} + +@Data +class MockLedgerEntries implements LedgerEntries { +int ledgerId; +int startEntryId; +int count; +int entrySize; +List entries; + +MockLedgerEntries(int ledgerId, int startEntryId, int count, int entrySize) { +this.ledgerId = ledgerId; +this.startEntryId = startEntryId; +this.count = count; +this.entrySize = entrySize; +this.entries = Lists.newArrayList(count); + +IntStream.range(startEntryId, startEntryId + count).forEach(i -> +entries.add(new MockLedgerEntry(ledgerId, i, entrySize))); +} + +@Override +public void close() { +entries.clear(); +} + +@Override +public LedgerEntry getEntry(long entryId) { +if (entryId < startEntryId || entryId >= startEntryId + count) { +return null; +} + +return entries.get(((int)entryId - startEntryId)); +} + +@Override +public Iterator iterator() { +return entries.iterator(); +} +} + +class MockReadHandle implements ReadHandle { +int ledgerId; +int entrySize; +int lac; +MockReadHandle(int ledgerId, int entrySize, int lac) { +this.ledgerId = ledgerId; +this.entrySize = entrySize; +this.lac = lac; +
[GitHub] sijie commented on issue #1669: PIP-17: provide DataBlockHeader and implementation
sijie commented on issue #1669: PIP-17: provide DataBlockHeader and implementation URL: https://github.com/apache/incubator-pulsar/pull/1669#issuecomment-386049447 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: PIP-17: the part of index block for offload. (#1593)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1404944 PIP-17: the part of index block for offload. (#1593) 1404944 is described below commit 14049448e5d4170b7ad90fefb8568eb265f278e7 Author: Jia ZhaiAuthorDate: Thu May 3 01:03:02 2018 +0800 PIP-17: the part of index block for offload. (#1593) * add offload index block * change follow ivan's comments * change to use api.LedgerMetadata * change following @sijie's comments * avoid using org.apache.bookkeeper.client.LedgerMetadata serialization * change following @ivan's comments --- .../pulsar/broker/s3offload/OffloadIndexBlock.java | 63 .../broker/s3offload/OffloadIndexBlockBuilder.java | 72 + .../pulsar/broker/s3offload/OffloadIndexEntry.java | 50 +++ .../impl/OffloadIndexBlockBuilderImpl.java | 77 + .../s3offload/impl/OffloadIndexBlockImpl.java | 337 + .../s3offload/impl/OffloadIndexEntryImpl.java | 58 .../apache/pulsar/s3offload/OffloadIndexTest.java | 237 +++ 7 files changed, 894 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java new file mode 100644 index 000..8f9d3ce --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; + +/** + * + * The Index block abstraction used for offload a ledger to long term storage. + * + */ +@Unstable +public interface OffloadIndexBlock extends Closeable { + +/** + * Get the content of the index block as InputStream. + * Read out in format: + * | index_magic_header | index_block_len | index_entry_count | + * |segment_metadata_length | segment metadata | index entries | + */ +InputStream toStream() throws IOException; + +/** + * Get the related OffloadIndexEntry that contains the given messageEntryId. + * + * @param messageEntryId + * the entry id of message + * @return the offload index entry + */ +OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws IOException; + +/** + * Get the entry count that contained in this index Block. + */ +int getEntryCount(); + +/** + * Get LedgerMetadata. + */ +LedgerMetadata getLedgerMetadata(); + +} + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java new file mode 100644 index 000..8ec0395 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */
[GitHub] sijie closed pull request #1593: PIP-17: the part of index block for offload.
sijie closed pull request #1593: PIP-17: the part of index block for offload. URL: https://github.com/apache/incubator-pulsar/pull/1593 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java new file mode 100644 index 00..8f9d3cecb1 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; + +/** + * + * The Index block abstraction used for offload a ledger to long term storage. + * + */ +@Unstable +public interface OffloadIndexBlock extends Closeable { + +/** + * Get the content of the index block as InputStream. + * Read out in format: + * | index_magic_header | index_block_len | index_entry_count | + * |segment_metadata_length | segment metadata | index entries | + */ +InputStream toStream() throws IOException; + +/** + * Get the related OffloadIndexEntry that contains the given messageEntryId. + * + * @param messageEntryId + * the entry id of message + * @return the offload index entry + */ +OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws IOException; + +/** + * Get the entry count that contained in this index Block. + */ +int getEntryCount(); + +/** + * Get LedgerMetadata. + */ +LedgerMetadata getLedgerMetadata(); + +} + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java new file mode 100644 index 00..8ec0395498 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload; + +import java.io.IOException; +import java.io.InputStream; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; +import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockBuilderImpl; + +/** + * Interface for builder of index block used for offload a ledger to long term storage. + */ +@Unstable +@LimitedPrivate +public interface OffloadIndexBlockBuilder { + +/** + * Build index block with the passed in ledger metadata. + * + * @param metadata the ledger metadata + */ +OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata); + +/** + * Add one payload block related information into index block. + * It contains the first entryId in payload block, the
[GitHub] sijie commented on issue #1715: Broker should not start replicator for root partitioned-topic
sijie commented on issue #1715: Broker should not start replicator for root partitioned-topic URL: https://github.com/apache/incubator-pulsar/pull/1715#issuecomment-386048749 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1716: Fix: deadlock while closing non-shared consumer
sijie commented on issue #1716: Fix: deadlock while closing non-shared consumer URL: https://github.com/apache/incubator-pulsar/pull/1716#issuecomment-386048679 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Rest API for Ledger Offloading (#1639)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5f678e0 Rest API for Ledger Offloading (#1639) 5f678e0 is described below commit 5f678e099095b133138ea0e536e06e916d8dc09e Author: Ivan KellyAuthorDate: Wed May 2 18:59:58 2018 +0200 Rest API for Ledger Offloading (#1639) * Rest API for Ledger Offloading Implemented for both V1 and V2 topic name formats. API takes a message ID, up to which the broker will try to offload messages. It returns the message ID of the first message in the topic which has not been offloaded. This patch also adds basic support for setting the Offloader implementation in the broker (needed for testing). Subsequent patches will make this configurable through ServiceConfiguration. * Split compaction endpoint into two One for triggering and one for getting the current status. * Add conflict to rest api doc * Fixed build --- .../org/apache/pulsar/broker/PulsarService.java| 6 + .../broker/admin/impl/PersistentTopicsBase.java| 24 +++- .../pulsar/broker/admin/v1/PersistentTopics.java | 37 +- .../pulsar/broker/admin/v2/PersistentTopics.java | 35 - .../pulsar/broker/service/BrokerService.java | 2 + .../broker/service/persistent/PersistentTopic.java | 57 +++- .../pulsar/broker/admin/AdminApiOffloadTest.java | 148 + .../apache/pulsar/broker/admin/AdminApiTest.java | 14 +- .../pulsar/broker/admin/v1/V1_AdminApiTest.java| 10 +- .../client/admin/LongRunningProcessStatus.java | 18 +-- .../pulsar/client/admin/OffloadProcessStatus.java | 55 .../org/apache/pulsar/client/admin/Topics.java | 19 ++- .../pulsar/client/admin/internal/TopicsImpl.java | 30 - .../pulsar/admin/cli/CmdPersistentTopics.java | 7 +- .../org/apache/pulsar/admin/cli/CmdTopics.java | 7 +- 15 files changed, 426 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c74ecd7..8d39fda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -46,7 +46,9 @@ import java.util.function.Supplier; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.pulsar.broker.admin.AdminResource; @@ -635,6 +637,10 @@ public class PulsarService implements AutoCloseable { return managedLedgerClientFactory.getManagedLedgerFactory(); } +public LedgerOffloader getManagedLedgerOffloader() { +return NullLedgerOffloader.INSTANCE; +} + public ZooKeeperCache getLocalZkCache() { return localZkCache; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ff50ec1..a354008 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -55,6 +55,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -75,6 +76,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -86,7 +89,6 @@ import
[GitHub] sijie closed pull request #1639: Rest API for Ledger Offloading
sijie closed pull request #1639: Rest API for Ledger Offloading URL: https://github.com/apache/incubator-pulsar/pull/1639 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 3d14b8b08f..6909a82a99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -46,7 +46,9 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.pulsar.broker.admin.AdminResource; @@ -635,6 +637,10 @@ public ManagedLedgerFactory getManagedLedgerFactory() { return managedLedgerClientFactory.getManagedLedgerFactory(); } +public LedgerOffloader getManagedLedgerOffloader() { +return NullLedgerOffloader.INSTANCE; +} + public ZooKeeperCache getLocalZkCache() { return localZkCache; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 21b14d075f..44f80cf166 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -74,6 +75,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -85,7 +88,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; -import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicDomain; @@ -1090,12 +1092,30 @@ protected void internalTriggerCompaction(boolean authoritative) { } } -protected CompactionStatus internalCompactionStatus(boolean authoritative) { +protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) { validateAdminOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); return topic.compactionStatus(); } +protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) { +validateAdminOperationOnTopic(authoritative); +PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); +try { +topic.triggerOffload(messageId); +} catch (AlreadyRunningException e) { +throw new RestException(Status.CONFLICT, e.getMessage()); +} catch (Exception e) { +throw new RestException(e); +} +} + +protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) { +validateAdminOperationOnTopic(authoritative); +PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); +return topic.offloadStatus(); +} + public static CompletableFuture getPartitionedTopicMetadata(PulsarService pulsar, String clientAppId, AuthenticationDataSource authenticationData, TopicName topicName) { CompletableFuture metadataFuture
[GitHub] srkukarni commented on a change in pull request #1708: Refactor functions to use Sink interface
srkukarni commented on a change in pull request #1708: Refactor functions to use Sink interface URL: https://github.com/apache/incubator-pulsar/pull/1708#discussion_r185565400 ## File path: pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java ## @@ -45,5 +45,5 @@ * @param value output value * @return Completable future fo async publish request */ -CompletableFuture write(T value); -} \ No newline at end of file +CompletableFuture write(T value) throws Exception; Review comment: +1 on this This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1639: Rest API for Ledger Offloading
sijie commented on issue #1639: Rest API for Ledger Offloading URL: https://github.com/apache/incubator-pulsar/pull/1639#issuecomment-386044513 @ivankelly yeah. will review. I was holding it after Matteo cut the 2.0 branch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jai1 commented on issue #1707: Fixed authentication flow via Pulsar Proxy
jai1 commented on issue #1707: Fixed authentication flow via Pulsar Proxy URL: https://github.com/apache/incubator-pulsar/pull/1707#issuecomment-386043230 @merlimat - ok changed the milestone to 2.0.1, can you review this PR This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1705: Add release notes page in website
merlimat commented on issue #1705: Add release notes page in website URL: https://github.com/apache/incubator-pulsar/pull/1705#issuecomment-386038126 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1709: Bumped version to 2.1.0-incubating-SNAPSHOT
merlimat commented on issue #1709: Bumped version to 2.1.0-incubating-SNAPSHOT URL: https://github.com/apache/incubator-pulsar/pull/1709#issuecomment-386037127 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1707: Fixed authentication flow via Pulsar Proxy
merlimat commented on issue #1707: Fixed authentication flow via Pulsar Proxy URL: https://github.com/apache/incubator-pulsar/pull/1707#issuecomment-386035196 @jai1 This doesn't seem to me a super-critical error to stop the current RC. I think we can bump to 2.0.1 where we'll get all fixes (and we should do soon). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Added missing license header in terraform.tfvars (#1706)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new b837af8 Added missing license header in terraform.tfvars (#1706) b837af8 is described below commit b837af8612165bb880d7d66f8291f8163a5507a5 Author: Matteo MerliAuthorDate: Wed May 2 09:05:50 2018 -0700 Added missing license header in terraform.tfvars (#1706) --- deployment/terraform-ansible/aws/terraform.tfvars | 19 +++ pom.xml | 4 tests/README.md | 21 + 3 files changed, 44 insertions(+) diff --git a/deployment/terraform-ansible/aws/terraform.tfvars b/deployment/terraform-ansible/aws/terraform.tfvars index 65f4ea5..7af884b 100644 --- a/deployment/terraform-ansible/aws/terraform.tfvars +++ b/deployment/terraform-ansible/aws/terraform.tfvars @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + public_key_path = "~/.ssh/id_rsa.pub" region = "us-west-2" availability_zone = "us-west-2a" diff --git a/pom.xml b/pom.xml index 761c710..29e8dca 100644 --- a/pom.xml +++ b/pom.xml @@ -904,6 +904,7 @@ flexible messaging model and an intuitive client API. SCRIPT_STYLE SCRIPT_STYLE SCRIPT_STYLE +SCRIPT_STYLE @@ -967,6 +968,9 @@ flexible messaging model and an intuitive client API. **/*.htpasswd src/test/resources/athenz.conf.test deployment/terraform-ansible/templates/myid + + +**/requirements.txt diff --git a/tests/README.md b/tests/README.md index 2fb4e8f..ccd4a8f 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,3 +1,24 @@ + + This directory contains integration tests for Pulsar. The integration tests use a framework called [Arquillian Cube](https://github.com/arquillian/arquillian-cube) to bring up a bunch of docker containers running Pulsar services. TestNG can then be used to test functionallity against these containers. -- To stop receiving notification emails like this one, please contact mme...@apache.org.
[GitHub] merlimat closed pull request #1706: Added missing license header in terraform.tfvars
merlimat closed pull request #1706: Added missing license header in terraform.tfvars URL: https://github.com/apache/incubator-pulsar/pull/1706 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/deployment/terraform-ansible/aws/terraform.tfvars b/deployment/terraform-ansible/aws/terraform.tfvars index 65f4ea528b..7af884b91a 100644 --- a/deployment/terraform-ansible/aws/terraform.tfvars +++ b/deployment/terraform-ansible/aws/terraform.tfvars @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + public_key_path = "~/.ssh/id_rsa.pub" region = "us-west-2" availability_zone = "us-west-2a" diff --git a/pom.xml b/pom.xml index 761c710fd2..29e8dca769 100644 --- a/pom.xml +++ b/pom.xml @@ -904,6 +904,7 @@ flexible messaging model and an intuitive client API. SCRIPT_STYLE SCRIPT_STYLE SCRIPT_STYLE +SCRIPT_STYLE @@ -967,6 +968,9 @@ flexible messaging model and an intuitive client API. **/*.htpasswd src/test/resources/athenz.conf.test deployment/terraform-ansible/templates/myid + + +**/requirements.txt diff --git a/tests/README.md b/tests/README.md index 2fb4e8f58d..ccd4a8fe73 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,3 +1,24 @@ + + This directory contains integration tests for Pulsar. The integration tests use a framework called [Arquillian Cube](https://github.com/arquillian/arquillian-cube) to bring up a bunch of docker containers running Pulsar services. TestNG can then be used to test functionallity against these containers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185533001 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -90,68 +84,60 @@ private int readEntries() throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); checkState(bytesReadOffset < dataBlockFullOffset); -try { -// once reach the end of entry buffer, start a new read. -if (entriesByteBuf.isEmpty()) { -readNextEntriesFromLedger(); -log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", -bytesReadOffset, blockBytesHave); -} - -// always read from the first ByteBuf in the list, once read all of its content remove it. -ByteBuf entryByteBuf = entriesByteBuf.get(0); -int ret = entryByteBuf.readByte(); -bytesReadOffset ++; +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); +} -if (entryByteBuf.readableBytes() == 0) { -entryByteBuf.release(); -entriesByteBuf.remove(0); +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf = entriesByteBuf.get(0); +int ret = entryByteBuf.readByte(); +bytesReadOffset ++; + +if (entryByteBuf.readableBytes() == 0) { +entryByteBuf.release(); +entriesByteBuf.remove(0); +blockEntryCount++; +if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) { Review comment: Thanks, will change it. This change did not follow the former comments, one reason is we need to keep dataBlockFullOffset to calculate the accurate bytes read from ledger, another reason is only when reading a new entry, it need to do the judgement `(bytesReadOffset + entryByteBuf.bytesAvailable()) > blockSize` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185509828 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -90,68 +84,60 @@ private int readEntries() throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); checkState(bytesReadOffset < dataBlockFullOffset); -try { -// once reach the end of entry buffer, start a new read. -if (entriesByteBuf.isEmpty()) { -readNextEntriesFromLedger(); -log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", -bytesReadOffset, blockBytesHave); -} - -// always read from the first ByteBuf in the list, once read all of its content remove it. -ByteBuf entryByteBuf = entriesByteBuf.get(0); -int ret = entryByteBuf.readByte(); -bytesReadOffset ++; +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); Review comment: it will return empty list, and line 93 will get IndexOutOfBoundsException. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185498765 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -90,68 +84,60 @@ private int readEntries() throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); checkState(bytesReadOffset < dataBlockFullOffset); -try { -// once reach the end of entry buffer, start a new read. -if (entriesByteBuf.isEmpty()) { -readNextEntriesFromLedger(); -log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", -bytesReadOffset, blockBytesHave); -} - -// always read from the first ByteBuf in the list, once read all of its content remove it. -ByteBuf entryByteBuf = entriesByteBuf.get(0); -int ret = entryByteBuf.readByte(); -bytesReadOffset ++; +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); Review comment: what happens if readNextEntriesFromLedger cannot read any more entries? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185498146 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -90,68 +84,60 @@ private int readEntries() throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); checkState(bytesReadOffset < dataBlockFullOffset); -try { -// once reach the end of entry buffer, start a new read. -if (entriesByteBuf.isEmpty()) { -readNextEntriesFromLedger(); -log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", -bytesReadOffset, blockBytesHave); -} - -// always read from the first ByteBuf in the list, once read all of its content remove it. -ByteBuf entryByteBuf = entriesByteBuf.get(0); -int ret = entryByteBuf.readByte(); -bytesReadOffset ++; +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); +} -if (entryByteBuf.readableBytes() == 0) { -entryByteBuf.release(); -entriesByteBuf.remove(0); +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf = entriesByteBuf.get(0); +int ret = entryByteBuf.readByte(); +bytesReadOffset ++; + +if (entryByteBuf.readableBytes() == 0) { +entryByteBuf.release(); +entriesByteBuf.remove(0); +blockEntryCount++; +if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) { +// not able to place a new Entry. +entriesByteBuf.forEach(buf -> buf.release()); +entriesByteBuf.clear(); +dataBlockFullOffset = bytesReadOffset; } - -return ret; -} catch (InterruptedException | ExecutionException e) { -log.error("Exception when get CompletableFuture. ", e); -throw new IOException(e); } -} -// read entries from ledger, and pre-handle the returned ledgerEntries. -private void readNextEntriesFromLedger() throws InterruptedException, ExecutionException { -checkState(bytesReadOffset == blockBytesHave); +return ret; +} -long start = startEntryId + blockEntryCount; -long end = Math.min(start + entriesNumberEachRead - 1, ledger.getLastAddConfirmed()); +private List readNextEntriesFromLedger(long start, long maxNumberEntries) throws IOException { +long end = Math.min(start + maxNumberEntries - 1, ledger.getLastAddConfirmed()); try (LedgerEntries ledgerEntriesOnce = ledger.readAsync(start, end).get()) { log.debug("read ledger entries. start: {}, end: {}", start, end); +List entries = Lists.newLinkedList(); + Iterator iterator = ledgerEntriesOnce.iterator(); -long entryId = start; while (iterator.hasNext()) { LedgerEntry entry = iterator.next(); int entryLength = (int) entry.getLength(); -entryId = entry.getEntryId(); - -if (blockSize - blockBytesHave >= entryLength + entryHeaderSize) { -// data block has space for this entry, keep this entry -CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(); -ByteBuf entryHeaderBuf = PooledByteBufAllocator.DEFAULT.buffer(entryHeaderSize, entryHeaderSize); - -entryHeaderBuf.writeInt(entryLength).writeLong(entryId); -entryBuf.addComponents(entryHeaderBuf, entry.getEntryBuffer().retain()); -entryBuf.writerIndex(entryHeaderSize + entryLength); - -entriesByteBuf.add(entryBuf); - -// set counters -blockEntryCount++; -payloadBytesHave += entryLength; -blockBytesHave += entryLength + entryHeaderSize; -} else { -// data block has no space left for a whole message entry -dataBlockFullOffset = blockBytesHave; -break; -} +long entryId = entry.getEntryId(); + +CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(); +ByteBuf entryHeaderBuf = PooledByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE); + +
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185496454 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -90,68 +84,60 @@ private int readEntries() throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); checkState(bytesReadOffset < dataBlockFullOffset); -try { -// once reach the end of entry buffer, start a new read. -if (entriesByteBuf.isEmpty()) { -readNextEntriesFromLedger(); -log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", -bytesReadOffset, blockBytesHave); -} - -// always read from the first ByteBuf in the list, once read all of its content remove it. -ByteBuf entryByteBuf = entriesByteBuf.get(0); -int ret = entryByteBuf.readByte(); -bytesReadOffset ++; +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); +} -if (entryByteBuf.readableBytes() == 0) { -entryByteBuf.release(); -entriesByteBuf.remove(0); +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf = entriesByteBuf.get(0); +int ret = entryByteBuf.readByte(); +bytesReadOffset ++; + +if (entryByteBuf.readableBytes() == 0) { +entryByteBuf.release(); +entriesByteBuf.remove(0); +blockEntryCount++; +if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) { +// not able to place a new Entry. +entriesByteBuf.forEach(buf -> buf.release()); +entriesByteBuf.clear(); +dataBlockFullOffset = bytesReadOffset; } - -return ret; -} catch (InterruptedException | ExecutionException e) { -log.error("Exception when get CompletableFuture. ", e); -throw new IOException(e); } -} -// read entries from ledger, and pre-handle the returned ledgerEntries. -private void readNextEntriesFromLedger() throws InterruptedException, ExecutionException { -checkState(bytesReadOffset == blockBytesHave); +return ret; +} -long start = startEntryId + blockEntryCount; -long end = Math.min(start + entriesNumberEachRead - 1, ledger.getLastAddConfirmed()); +private List readNextEntriesFromLedger(long start, long maxNumberEntries) throws IOException { +long end = Math.min(start + maxNumberEntries - 1, ledger.getLastAddConfirmed()); try (LedgerEntries ledgerEntriesOnce = ledger.readAsync(start, end).get()) { log.debug("read ledger entries. start: {}, end: {}", start, end); +List entries = Lists.newLinkedList(); + Iterator iterator = ledgerEntriesOnce.iterator(); -long entryId = start; while (iterator.hasNext()) { LedgerEntry entry = iterator.next(); int entryLength = (int) entry.getLength(); -entryId = entry.getEntryId(); - -if (blockSize - blockBytesHave >= entryLength + entryHeaderSize) { -// data block has space for this entry, keep this entry -CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(); -ByteBuf entryHeaderBuf = PooledByteBufAllocator.DEFAULT.buffer(entryHeaderSize, entryHeaderSize); - -entryHeaderBuf.writeInt(entryLength).writeLong(entryId); -entryBuf.addComponents(entryHeaderBuf, entry.getEntryBuffer().retain()); -entryBuf.writerIndex(entryHeaderSize + entryLength); - -entriesByteBuf.add(entryBuf); - -// set counters -blockEntryCount++; -payloadBytesHave += entryLength; -blockBytesHave += entryLength + entryHeaderSize; -} else { -// data block has no space left for a whole message entry -dataBlockFullOffset = blockBytesHave; -break; -} +long entryId = entry.getEntryId(); + +CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(); Review comment: nit: specify that the compositeBuffer will have 2 buffers since, we know it will.
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185495929 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -90,68 +84,60 @@ private int readEntries() throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); checkState(bytesReadOffset < dataBlockFullOffset); -try { -// once reach the end of entry buffer, start a new read. -if (entriesByteBuf.isEmpty()) { -readNextEntriesFromLedger(); -log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", -bytesReadOffset, blockBytesHave); -} - -// always read from the first ByteBuf in the list, once read all of its content remove it. -ByteBuf entryByteBuf = entriesByteBuf.get(0); -int ret = entryByteBuf.readByte(); -bytesReadOffset ++; +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ); +} -if (entryByteBuf.readableBytes() == 0) { -entryByteBuf.release(); -entriesByteBuf.remove(0); +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf = entriesByteBuf.get(0); +int ret = entryByteBuf.readByte(); +bytesReadOffset ++; + +if (entryByteBuf.readableBytes() == 0) { +entryByteBuf.release(); +entriesByteBuf.remove(0); +blockEntryCount++; +if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) { Review comment: What entriesByteBuf is empty, but the first entry in the next read block will put you over the blockSize? This check needs to move to before line 92 (that's why I had the if-else in previous comment). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185492114 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { Review comment: thanks, will do it later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185491684 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + +private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; +// Number of payload Bytes read from ledger, and has been has been kept in this InputStream. +private int payloadBytesHave; +// Number of bytes that has been kept in this InputStream. +private int blockBytesHave; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int entriesNumberEachRead = 100; +// buf the entry size and entry id. +private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each element contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockBytesHave = DataBlockHeaderImpl.getDataStartOffset(); +this.payloadBytesHave = 0; +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < dataBlockFullOffset); + +try { +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +readNextEntriesFromLedger(); +log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", +bytesReadOffset, blockBytesHave); +} + +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf = entriesByteBuf.get(0); +int ret = entryByteBuf.readByte(); +bytesReadOffset ++; + +
[GitHub] zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
zhaijack commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185464725 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + +private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; +// Number of payload Bytes read from ledger, and has been has been kept in this InputStream. +private int payloadBytesHave; +// Number of bytes that has been kept in this InputStream. +private int blockBytesHave; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int entriesNumberEachRead = 100; +// buf the entry size and entry id. +private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each element contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; Review comment: Thanks, it can be, would like to keep it , or change it to List, this is more clear for code reader. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185438219 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + +private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; +// Number of payload Bytes read from ledger, and has been has been kept in this InputStream. +private int payloadBytesHave; +// Number of bytes that has been kept in this InputStream. +private int blockBytesHave; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int entriesNumberEachRead = 100; +// buf the entry size and entry id. +private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each element contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockBytesHave = DataBlockHeaderImpl.getDataStartOffset(); +this.payloadBytesHave = 0; +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < dataBlockFullOffset); + +try { +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +readNextEntriesFromLedger(); +log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", +bytesReadOffset, blockBytesHave); +} + +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf = entriesByteBuf.get(0); +int ret = entryByteBuf.readByte(); +bytesReadOffset ++; + +
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185441127 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + +private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD); Review comment: BLOCK_PADDING This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185456144 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + +private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; +// Number of payload Bytes read from ledger, and has been has been kept in this InputStream. +private int payloadBytesHave; +// Number of bytes that has been kept in this InputStream. +private int blockBytesHave; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int entriesNumberEachRead = 100; +// buf the entry size and entry id. +private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each element contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockBytesHave = DataBlockHeaderImpl.getDataStartOffset(); +this.payloadBytesHave = 0; +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < dataBlockFullOffset); + +try { +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +readNextEntriesFromLedger(); +log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", +bytesReadOffset, blockBytesHave); +} + +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf = entriesByteBuf.get(0); +int ret = entryByteBuf.readByte(); +bytesReadOffset ++; + +
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185456500 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { Review comment: Could we define an interface and impl for this, so it's clear what extra data BlockAwareSegmentInputStream exposes? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185441005 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + +private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; +// Number of payload Bytes read from ledger, and has been has been kept in this InputStream. +private int payloadBytesHave; +// Number of bytes that has been kept in this InputStream. +private int blockBytesHave; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int entriesNumberEachRead = 100; +// buf the entry size and entry id. +private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry id */; Review comment: ENTRY_HEADER_SIZE This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185441047 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + +private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; +// Number of payload Bytes read from ledger, and has been has been kept in this InputStream. +private int payloadBytesHave; +// Number of bytes that has been kept in this InputStream. +private int blockBytesHave; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int entriesNumberEachRead = 100; Review comment: ENTRIES_PER_READ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185453817 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + +private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; +// Number of payload Bytes read from ledger, and has been has been kept in this InputStream. +private int payloadBytesHave; +// Number of bytes that has been kept in this InputStream. +private int blockBytesHave; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int entriesNumberEachRead = 100; +// buf the entry size and entry id. +private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each element contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockBytesHave = DataBlockHeaderImpl.getDataStartOffset(); +this.payloadBytesHave = 0; +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < dataBlockFullOffset); + +try { +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +readNextEntriesFromLedger(); +log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", +bytesReadOffset, blockBytesHave); +} + +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf = entriesByteBuf.get(0); +int ret = entryByteBuf.readByte(); +bytesReadOffset ++; + +
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185435514 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + +private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; +// Number of payload Bytes read from ledger, and has been has been kept in this InputStream. +private int payloadBytesHave; +// Number of bytes that has been kept in this InputStream. +private int blockBytesHave; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int entriesNumberEachRead = 100; +// buf the entry size and entry id. +private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each element contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; Review comment: It can be a List. The user of the buffer doesn't need to know what's backing it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test
ivankelly commented on a change in pull request #1678: PIP-17: provide BlockAwareSegmentInputStream implementation and test URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185456251 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java ## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends InputStream { +private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + +private static final byte[] blockEndPadding = Ints.toByteArray(0xFEDCDEAD); + +private final ReadHandle ledger; +private final long startEntryId; +private final int blockSize; + +// Number of Message entries that read from ledger and been readout from this InputStream. +private int blockEntryCount; +// Number of payload Bytes read from ledger, and has been has been kept in this InputStream. +private int payloadBytesHave; +// Number of bytes that has been kept in this InputStream. +private int blockBytesHave; + +// tracking read status for both header and entries. +// Bytes that already been read from this InputStream +private int bytesReadOffset = 0; +// Byte from this index is all padding byte +private int dataBlockFullOffset; +private final InputStream dataBlockHeaderStream; + +// how many entries want to read from ReadHandle each time. +private static final int entriesNumberEachRead = 100; +// buf the entry size and entry id. +private static final int entryHeaderSize = 4 /* entry size*/ + 8 /* entry id */; +// Keep a list of all entries ByteBuf, each element contains 2 buf: entry header and entry content. +private List entriesByteBuf = null; + +public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, int blockSize) { +this.ledger = ledger; +this.startEntryId = startEntryId; +this.blockSize = blockSize; +this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream(); +this.blockBytesHave = DataBlockHeaderImpl.getDataStartOffset(); +this.payloadBytesHave = 0; +this.blockEntryCount = 0; +this.dataBlockFullOffset = blockSize; +this.entriesByteBuf = Lists.newLinkedList(); +} + +// read ledger entries. +private int readEntries() throws IOException { +checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); +checkState(bytesReadOffset < dataBlockFullOffset); + +try { +// once reach the end of entry buffer, start a new read. +if (entriesByteBuf.isEmpty()) { +readNextEntriesFromLedger(); +log.debug("After readNextEntriesFromLedger: bytesReadOffset: {}, blockBytesHave: {}", +bytesReadOffset, blockBytesHave); +} + +// always read from the first ByteBuf in the list, once read all of its content remove it. +ByteBuf entryByteBuf = entriesByteBuf.get(0); +int ret = entryByteBuf.readByte(); +bytesReadOffset ++; + +
[GitHub] ivankelly commented on issue #1669: PIP-17: provide DataBlockHeader and implementation
ivankelly commented on issue #1669: PIP-17: provide DataBlockHeader and implementation URL: https://github.com/apache/incubator-pulsar/pull/1669#issuecomment-385912117 Weird. It looks like the test hangs on MessageIdTest for over 2 hours This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #1669: PIP-17: provide DataBlockHeader and implementation
ivankelly commented on issue #1669: PIP-17: provide DataBlockHeader and implementation URL: https://github.com/apache/incubator-pulsar/pull/1669#issuecomment-385912117 Weird. It looks like the test hangs on MessageIdTest for over 2 hours. Obviously nothing to do with this change though. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #1687: Don't offload empty ledgers
ivankelly commented on issue #1687: Don't offload empty ledgers URL: https://github.com/apache/incubator-pulsar/pull/1687#issuecomment-385909859 retest this please // timeout on unit tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #1639: Rest API for Ledger Offloading
ivankelly commented on issue #1639: Rest API for Ledger Offloading URL: https://github.com/apache/incubator-pulsar/pull/1639#issuecomment-385909641 @sijie pinging. This is ready for review again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch branch-1.22 updated: Fix: deadlock while closing non-persistent topic
This is an automated email from the ASF dual-hosted git repository. jai1 pushed a commit to branch branch-1.22 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-1.22 by this push: new 1a31f91 Fix: deadlock while closing non-persistent topic 1a31f91 is described below commit 1a31f91660de11060e0286e83ea35ef745e596f7 Author: Jai AsherAuthorDate: Wed May 2 01:28:40 2018 -0700 Fix: deadlock while closing non-persistent topic --- .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 44a9e14..5d6b4ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -462,8 +462,12 @@ public class NonPersistentTopic implements Topic { FutureUtil.waitForAll(futures).thenRun(() -> { log.info("[{}] Topic closed", topic); -brokerService.pulsar().getExecutor().submit(() -> brokerService.removeTopicFromCache(topic)); -closeFuture.complete(null); +// unload topic iterates over topics map and removing from the map with the same thread creates deadlock. +// so, execute it in different thread +brokerService.executor().execute(() -> { +brokerService.removeTopicFromCache(topic); +closeFuture.complete(null); +}); }).exceptionally(exception -> { log.error("[{}] Error closing topic", topic, exception); isFenced = false; -- To stop receiving notification emails like this one, please contact j...@apache.org.
[incubator-pulsar] branch branch-1.22 updated: Relocate service files for shading `pulsar-client-admin` module
This is an automated email from the ASF dual-hosted git repository. jai1 pushed a commit to branch branch-1.22 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-1.22 by this push: new 4c946b6 Relocate service files for shading `pulsar-client-admin` module 4c946b6 is described below commit 4c946b6d70172347d07057f8cb5a2b33ca327d3c Author: Jai AsherAuthorDate: Wed May 2 01:28:21 2018 -0700 Relocate service files for shading `pulsar-client-admin` module --- pom.xml| 2 +- pulsar-client-admin-shaded/pom.xml | 4 pulsar-client-shaded/pom.xml | 4 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b4e63d7..daa6c5a 100644 --- a/pom.xml +++ b/pom.xml @@ -805,7 +805,7 @@ flexible messaging model and an intuitive client API. org.apache.maven.plugins maven-shade-plugin - 2.4.2 + 3.1.0 maven-enforcer-plugin diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index c5624fe..f72243f 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -185,6 +185,10 @@ org.apache.pulsar.admin.shade.org.reactivestreams + + + + diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index 81f873e..9219c1f 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -160,6 +160,10 @@ org.apache.pulsar.shade.org.apache.http + + + + -- To stop receiving notification emails like this one, please contact j...@apache.org.
[GitHub] jai1 closed pull request #1712: Fix: NPE when cursor failed to close empty subscription
jai1 closed pull request #1712: Fix: NPE when cursor failed to close empty subscription URL: https://github.com/apache/incubator-pulsar/pull/1712 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index e742f148cb..5e6153ba40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -533,7 +533,9 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { disconnectFuture.complete(null); }).exceptionally(exception -> { IS_FENCED_UPDATER.set(this, FALSE); -dispatcher.reset(); +if (dispatcher != null) { +dispatcher.reset(); +} log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName, exception); disconnectFuture.completeExceptionally(exception); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jai1 closed pull request #1713: Fix: deadlock while closing non-persistent topic
jai1 closed pull request #1713: Fix: deadlock while closing non-persistent topic URL: https://github.com/apache/incubator-pulsar/pull/1713 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 44a9e14434..5d6b4baf66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -462,8 +462,12 @@ void removeSubscription(String subscriptionName) { FutureUtil.waitForAll(futures).thenRun(() -> { log.info("[{}] Topic closed", topic); -brokerService.pulsar().getExecutor().submit(() -> brokerService.removeTopicFromCache(topic)); -closeFuture.complete(null); +// unload topic iterates over topics map and removing from the map with the same thread creates deadlock. +// so, execute it in different thread +brokerService.executor().execute(() -> { +brokerService.removeTopicFromCache(topic); +closeFuture.complete(null); +}); }).exceptionally(exception -> { log.error("[{}] Error closing topic", topic, exception); isFenced = false; This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jai1 closed pull request #1714: Relocate service files for shading `pulsar-client-admin` module (#1370)
jai1 closed pull request #1714: Relocate service files for shading `pulsar-client-admin` module (#1370) URL: https://github.com/apache/incubator-pulsar/pull/1714 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pom.xml b/pom.xml index b4e63d7211..daa6c5a704 100644 --- a/pom.xml +++ b/pom.xml @@ -805,7 +805,7 @@ flexible messaging model and an intuitive client API. org.apache.maven.plugins maven-shade-plugin - 2.4.2 + 3.1.0 maven-enforcer-plugin diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index c5624fe022..f72243f2a8 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -185,6 +185,10 @@ org.apache.pulsar.admin.shade.org.reactivestreams + + + + diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index 81f873e742..9219c1f718 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -160,6 +160,10 @@ org.apache.pulsar.shade.org.apache.http + + + + This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch branch-1.22 updated: Fix: NPE when cursor failed to close empty subscription
This is an automated email from the ASF dual-hosted git repository. jai1 pushed a commit to branch branch-1.22 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-1.22 by this push: new 5ce64ff Fix: NPE when cursor failed to close empty subscription 5ce64ff is described below commit 5ce64ff0f2196ad791b2e19c64772c11a63e0030 Author: Jai AsherAuthorDate: Wed May 2 01:27:53 2018 -0700 Fix: NPE when cursor failed to close empty subscription --- .../pulsar/broker/service/persistent/PersistentSubscription.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index e742f14..5e6153b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -533,7 +533,9 @@ public class PersistentSubscription implements Subscription { disconnectFuture.complete(null); }).exceptionally(exception -> { IS_FENCED_UPDATER.set(this, FALSE); -dispatcher.reset(); +if (dispatcher != null) { +dispatcher.reset(); +} log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName, exception); disconnectFuture.completeExceptionally(exception); -- To stop receiving notification emails like this one, please contact j...@apache.org.
[incubator-pulsar] branch branch-1.22 updated: Issue #1117: handle race in concurrent bundle split
This is an automated email from the ASF dual-hosted git repository. jai1 pushed a commit to branch branch-1.22 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-1.22 by this push: new dd5c425 Issue #1117: handle race in concurrent bundle split dd5c425 is described below commit dd5c425f9b9df0de4852a49a9d8f731020271320 Author: Jai AsherAuthorDate: Wed May 2 01:27:05 2018 -0700 Issue #1117: handle race in concurrent bundle split --- .../broker/cache/LocalZooKeeperCacheService.java | 21 ++- .../pulsar/broker/namespace/NamespaceService.java | 171 ++--- .../pulsar/broker/service/BrokerService.java | 13 +- .../common/naming/NamespaceBundleFactory.java | 32 ++-- .../pulsar/common/naming/NamespaceBundles.java | 18 ++- .../apache/pulsar/broker/admin/AdminApiTest.java | 115 ++ .../broker/namespace/NamespaceServiceTest.java | 10 +- .../pulsar/zookeeper/ZooKeeperDataCache.java | 8 + 8 files changed, 306 insertions(+), 82 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java index 66a1ffa..4b28cad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java @@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT; import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; +import com.google.common.collect.Maps; +import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -38,6 +40,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,20 +82,28 @@ public class LocalZooKeeperCacheService { @Override public CompletableFuture getAsync(String path) { -CompletableFuture future = new CompletableFuture<>(); +return getWithStatAsync(path).thenApply(entry -> entry.map(e -> e.getKey())); +} + +@Override +public CompletableFuture >> getWithStatAsync(String path) { +CompletableFuture >> future = new CompletableFuture<>(); // First check in local-zk cache -super.getAsync(path).thenAccept(localPolicies -> { +super.getWithStatAsync(path).thenAccept(result -> { +Optional localPolicies = result.map(Entry::getKey); if (localPolicies.isPresent()) { -future.complete(localPolicies); +future.complete(result); } else { // create new policies node under Local ZK by coping it from Global ZK createPolicies(path, true).thenAccept(p -> { LOG.info("Successfully created local policies for {} -- {}", path, p); // local-policies have been created but it's not part of policiesCache. so, call // super.getAsync() which will load it and set the watch on local-policies path -super.getAsync(path); -future.complete(p); +super.getWithStatAsync(path); +Stat stat = new Stat(); +stat.setVersion(-1); + future.complete(Optional.of(Maps.immutableEntry(p.orElse(null), stat))); }).exceptionally(ex -> { future.completeExceptionally(ex); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index e7b885a..b8e4ae0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; -import static
[GitHub] jai1 closed pull request #1711: Fixed Lookup redirect logic on Proxy side
jai1 closed pull request #1711: Fixed Lookup redirect logic on Proxy side URL: https://github.com/apache/incubator-pulsar/pull/1711 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 7d4d683ba1..148a7a4893 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -125,7 +125,7 @@ private void performLookup(long clientRequestId, String topic, String brokerServ requestId).thenAccept(result -> { if (result.redirect) { // Need to try the lookup again on a different broker -performLookup(clientRequestId, topic, result.brokerUrl, authoritative, numberOfRetries - 1); +performLookup(clientRequestId, topic, result.brokerUrl, result.authoritative, numberOfRetries - 1); } else { // We have the result immediately String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl; This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services