This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6ef7aca optimizing throughput in Pulsar Presto connector (#2564) 6ef7aca is described below commit 6ef7acaf37d57769c4d9dbf7558ef627ce061339 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Thu Sep 13 14:50:01 2018 -0700 optimizing throughput in Pulsar Presto connector (#2564) ### Motivation 1. Currently, the presto pulsar connector will read synchronously from bookkeeper when it has run out of entries go process. Basically, we process a batch of entries and then we read more. Ideally should be doing reading and processing in parallel to increase throughput. 2. Each split initializes their own ManagedLedgerFactory/Bookkeeper client. We really just need one bookkeeper client to be shared among threads. ### Modifications 1. Rewrote the logic in the Presto Pulsar connector to read async and process in parallel 2. Cache ManagedLedgerFactory to be used across splits ### Result I see about 2X throughput improvement on single node as well as cluster (2 brokers, 3 bookies, 4 presto workers including coordinator) on AWS --- conf/presto/catalog/pulsar.properties | 6 +- .../apache/pulsar/sql/presto/PulsarConnector.java | 5 + .../pulsar/sql/presto/PulsarConnectorCache.java | 64 +++++++ .../pulsar/sql/presto/PulsarConnectorConfig.java | 26 ++- .../pulsar/sql/presto/PulsarRecordCursor.java | 201 +++++++++++++++------ .../pulsar/sql/presto/TestPulsarConnector.java | 146 ++++++++------- .../pulsar/sql/presto/TestPulsarRecordCursor.java | 1 + 7 files changed, 326 insertions(+), 123 deletions(-) diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties index 23b945e..5f922e5 100644 --- a/conf/presto/catalog/pulsar.properties +++ b/conf/presto/catalog/pulsar.properties @@ -26,4 +26,8 @@ pulsar.zookeeper-uri=localhost:2181 # minimum number of entries to read at a single time pulsar.entry-read-batch-size=100 # default number of splits to use per query -pulsar.target-num-splits=4 +pulsar.target-num-splits=2 +# max message queue size +pulsar.max-split-message-queue-size=10000 +# max entry queue size +pulsar.max-split-entry-queue-size = 1000 diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java index 1d89b51..498583d 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java @@ -87,6 +87,11 @@ public class PulsarConnector implements Connector { log.error(e, "Failed to close pulsar connector"); } try { + PulsarConnectorCache.shutdown(); + } catch (Exception e) { + log.error("Failed to shutdown pulsar connector cache"); + } + try { lifeCycleManager.stop(); } catch (Exception e) { log.error(e, "Error shutting down connector"); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java new file mode 100644 index 0000000..d13ddcd --- /dev/null +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.sql.presto; + +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration; + +public class PulsarConnectorCache { + + private static PulsarConnectorCache instance; + + private final ManagedLedgerFactory managedLedgerFactory; + + private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { + this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig); + } + + public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { + synchronized (PulsarConnectorCache.class) { + if (instance == null) { + instance = new PulsarConnectorCache(pulsarConnectorConfig); + } + } + return instance; + } + + private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { + ClientConfiguration bkClientConfiguration = new ClientConfiguration() + .setZkServers(pulsarConnectorConfig.getZookeeperUri()) + .setAllowShadedLedgerManagerFactoryClass(true) + .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.") + .setReadEntryTimeout(60); + return new ManagedLedgerFactoryImpl(bkClientConfiguration); + } + + public ManagedLedgerFactory getManagedLedgerFactory() { + return managedLedgerFactory; + } + + public static void shutdown() throws ManagedLedgerException, InterruptedException { + if (instance != null) { + instance.managedLedgerFactory.shutdown(); + instance = null; + } + } +} diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java index 1f574c6..482fab3 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java @@ -29,7 +29,9 @@ public class PulsarConnectorConfig implements AutoCloseable { private String brokerServiceUrl = "http://localhost:8080"; private String zookeeperUri = "localhost:2181"; private int entryReadBatchSize = 100; - private int targetNumSplits = 4; + private int targetNumSplits = 2; + private int maxSplitMessageQueueSize = 10000; + private int maxSplitEntryQueueSize = 1000; private PulsarAdmin pulsarAdmin; @NotNull @@ -77,6 +79,28 @@ public class PulsarConnectorConfig implements AutoCloseable { } @NotNull + public int getMaxSplitMessageQueueSize() { + return this.maxSplitMessageQueueSize; + } + + @Config("pulsar.max-split-message-queue-size") + public PulsarConnectorConfig setMaxSplitMessageQueueSize(int maxSplitMessageQueueSize) { + this.maxSplitMessageQueueSize = maxSplitMessageQueueSize; + return this; + } + + @NotNull + public int getMaxSplitEntryQueueSize() { + return this.maxSplitEntryQueueSize; + } + + @Config("pulsar.max-split-entry-queue-size") + public PulsarConnectorConfig setMaxSplitEntryQueueSize(int maxSplitEntryQueueSize) { + this.maxSplitEntryQueueSize = maxSplitEntryQueueSize; + return this; + } + + @NotNull public PulsarAdmin getPulsarAdmin() throws PulsarClientException { if (this.pulsarAdmin == null) { this.pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getBrokerServiceUrl()).build(); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index ef56e6c..c8106aa 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -28,6 +28,7 @@ import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import org.apache.avro.Schema; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -44,10 +45,11 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration; import java.io.IOException; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.type.BigintType.BIGINT; @@ -62,35 +64,47 @@ import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_W import static com.facebook.presto.spi.type.TinyintType.TINYINT; import static com.google.common.base.Preconditions.checkArgument; + public class PulsarRecordCursor implements RecordCursor { private List<PulsarColumnHandle> columnHandles; private PulsarSplit pulsarSplit; private PulsarConnectorConfig pulsarConnectorConfig; - private ManagedLedgerFactory managedLedgerFactory; private ReadOnlyCursor cursor; - private Queue<Message> messageQueue = new LinkedList<>(); + private ArrayBlockingQueue<Message> messageQueue; + private ArrayBlockingQueue<Entry> entryQueue; private Object currentRecord; private Message currentMessage; private Map<String, PulsarInternalColumn> internalColumnMap = PulsarInternalColumn.getInternalFieldsMap(); private SchemaHandler schemaHandler; private int batchSize; - private long completedBytes = 0L; + private AtomicLong completedBytes = new AtomicLong(0L); + private ReadEntries readEntries; + private DeserializeEntries deserializeEntries; + private TopicName topicName; private static final Logger log = Logger.get(PulsarRecordCursor.class); + private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { + ClientConfiguration bkClientConfiguration = new ClientConfiguration() + .setZkServers(pulsarConnectorConfig.getZookeeperUri()) + .setAllowShadedLedgerManagerFactoryClass(true) + .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade."); + return new ManagedLedgerFactoryImpl(bkClientConfiguration); + } + public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig pulsarConnectorConfig) { - - ManagedLedgerFactory managedLedgerFactory; + PulsarConnectorCache pulsarConnectorCache; try { - managedLedgerFactory = getManagedLedgerFactory(pulsarConnectorConfig); + pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig); } catch (Exception e) { - log.error(e, "Failed to initialize managed ledger factory"); + log.error(e, "Failed to initialize Pulsar connector cache"); close(); throw new RuntimeException(e); } - initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory); + initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, + pulsarConnectorCache.getManagedLedgerFactory()); } // Exposed for testing purposes @@ -105,7 +119,11 @@ public class PulsarRecordCursor implements RecordCursor { this.pulsarSplit = pulsarSplit; this.pulsarConnectorConfig = pulsarConnectorConfig; this.batchSize = pulsarConnectorConfig.getEntryReadBatchSize(); - this.managedLedgerFactory = managedLedgerFactory; + this.messageQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize()); + this.entryQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize()); + this.topicName = TopicName.get("persistent", + NamespaceName.get(pulsarSplit.getSchemaName()), + pulsarSplit.getTableName()); Schema schema = PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema()); @@ -149,18 +167,9 @@ public class PulsarRecordCursor implements RecordCursor { return cursor; } - private ManagedLedgerFactory getManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { - ClientConfiguration bkClientConfiguration = new ClientConfiguration() - .setZkServers(pulsarConnectorConfig.getZookeeperUri()) - .setAllowShadedLedgerManagerFactoryClass(true) - .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade."); - return new ManagedLedgerFactoryImpl(bkClientConfiguration); - } - - @Override public long getCompletedBytes() { - return this.completedBytes; + return this.completedBytes.get(); } @Override @@ -174,39 +183,48 @@ public class PulsarRecordCursor implements RecordCursor { return columnHandles.get(field).getType(); } - @Override - public boolean advanceNextPosition() { + @VisibleForTesting + class DeserializeEntries implements Runnable { - if (this.messageQueue.isEmpty()) { - if (!this.cursor.hasMoreEntries()) { - return false; - } - if (((PositionImpl) this.cursor.getReadPosition()) - .compareTo(this.pulsarSplit.getEndPosition()) >= 0) { - return false; - } + protected AtomicBoolean isRunning = new AtomicBoolean(false); - TopicName topicName = TopicName.get("persistent", - NamespaceName.get(this.pulsarSplit.getSchemaName()), - this.pulsarSplit.getTableName()); + private final Thread thread; - List<Entry> newEntries; - try { - newEntries = this.cursor.readEntries(this.batchSize); - } catch (InterruptedException | ManagedLedgerException e) { - log.error(e, "Failed to read new entries from pulsar topic %s", topicName.toString()); - throw new RuntimeException(e); - } + public DeserializeEntries() { + this.thread = new Thread(this); + } + + public void interrupt() { + isRunning.set(false); + thread.interrupt(); + } + + public void start() { + this.thread.start(); + } - newEntries.forEach(entry -> { + @Override + public void run() { + isRunning.set(true); + while (isRunning.get()) { + Entry entry; try { - completedBytes += entry.getDataBuffer().readableBytes(); + entry = entryQueue.take(); + } catch (InterruptedException e) { + break; + } + try { + completedBytes.addAndGet(entry.getDataBuffer().readableBytes()); // filter entries that is not part of my split if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) { try { MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer(), (messageId, message, byteBuf) -> { - messageQueue.add(message); + try { + messageQueue.put(message); + } catch (InterruptedException e) { + //no-op + } }); } catch (IOException e) { log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString()); @@ -216,15 +234,92 @@ public class PulsarRecordCursor implements RecordCursor { } finally { entry.release(); } - }); + } } + } - this.currentMessage = this.messageQueue.poll(); - currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData()); + @VisibleForTesting + class ReadEntries implements AsyncCallbacks.ReadEntriesCallback { + + // indicate whether there are any additional entries left to read + private final AtomicBoolean isDone = new AtomicBoolean(false); + + //num of outstanding read requests + // set to 1 because we can only read one batch a time + private final AtomicLong outstandingReadsRequests = new AtomicLong(1); + + public void run() { + + if (outstandingReadsRequests.get() > 0) { + if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition()) + .compareTo(pulsarSplit.getEndPosition()) >= 0) { + isDone.set(true); + + } else if (entryQueue.remainingCapacity() > batchSize) { + outstandingReadsRequests.decrementAndGet(); + cursor.asyncReadEntries(batchSize, this, System.currentTimeMillis()); + } + } + } + + @Override + public void readEntriesComplete(List<Entry> entries, Object ctx) { + entryQueue.addAll(entries); + outstandingReadsRequests.incrementAndGet(); + } + + public boolean hashFinished() { + return messageQueue.isEmpty() && entryQueue.isEmpty() && isDone.get() && outstandingReadsRequests.get() >=1; + } + + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + log.debug(exception, "Failed to read entries from topic %s", topicName.toString()); + outstandingReadsRequests.incrementAndGet(); + } + } + + + @Override + public boolean advanceNextPosition() { + + if (readEntries == null) { + readEntries = new ReadEntries(); + readEntries.run(); + + // start deserialize thread + deserializeEntries = new DeserializeEntries(); + deserializeEntries.start(); + } + + while(true) { + if (readEntries.hashFinished()) { + return false; + } + + if (messageQueue.remainingCapacity() > 0) { + readEntries.run(); + } + + currentMessage = messageQueue.poll(); + if (currentMessage != null) { + break; + } else { + try { + Thread.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData()); return true; } + @VisibleForTesting Object getRecord(int fieldIndex) { if (this.currentRecord == null) { @@ -317,17 +412,13 @@ public class PulsarRecordCursor implements RecordCursor { @Override public void close() { - if (this.cursor != null) { - try { - this.cursor.close(); - } catch (Exception e) { - log.error(e); - } + if (deserializeEntries != null) { + deserializeEntries.interrupt(); } - if (managedLedgerFactory != null) { + if (this.cursor != null) { try { - managedLedgerFactory.shutdown(); + this.cursor.close(); } catch (Exception e) { log.error(e); } diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java index 0882efc..5d8472d 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java @@ -27,6 +27,7 @@ import com.facebook.presto.spi.type.RealType; import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.VarcharType; import io.airlift.log.Logger; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.Position; @@ -68,6 +69,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -662,6 +664,9 @@ public abstract class TestPulsarConnector { @BeforeMethod public void setup() throws Exception { this.pulsarConnectorConfig = spy(new PulsarConnectorConfig()); + this.pulsarConnectorConfig.setEntryReadBatchSize(1); + this.pulsarConnectorConfig.setMaxSplitEntryQueueSize(10); + this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100); Tenants tenants = mock(Tenants.class); doReturn(new LinkedList<>(topicNames.stream().map(new Function<TopicName, String>() { @@ -786,77 +791,86 @@ public abstract class TestPulsarConnector { } }); - when(readOnlyCursor.readEntries(anyInt())).thenAnswer(new Answer<List<Entry>>() { + doAnswer(new Answer() { @Override - public List<Entry> answer(InvocationOnMock invocationOnMock) throws Throwable { + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Object[] args = invocationOnMock.getArguments(); Integer readEntries = (Integer) args[0]; + AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback) args[1]; + Object ctx = args[2]; + + new Thread(new Runnable() { + @Override + public void run() { + List < Entry > entries = new LinkedList<>(); + for (int i = 0; i < readEntries; i++) { + + Foo.Bar foobar = new Foo.Bar(); + foobar.field1 = (int) fooFunctions.get("bar.test.foobar.field1").apply(count); + + Boo boo1 = new Boo(); + boo1.field4 = (double) fooFunctions.get("bar.test.field4").apply(count); + boo1.field5 = (boolean) fooFunctions.get("bar.test.field5").apply(count); + boo1.field6 = (long) fooFunctions.get("bar.test.field6").apply(count); + boo1.foo = new Foo(); + boo1.boo = null; + boo1.bar = new Bar(); + boo1.foobar = foobar; + + Boo boo2 = new Boo(); + boo2.field4 = (double) fooFunctions.get("bar.test2.field4").apply(count); + boo2.field5 = (boolean) fooFunctions.get("bar.test2.field5").apply(count); + boo2.field6 = (long) fooFunctions.get("bar.test2.field6").apply(count); + boo2.foo = new Foo(); + boo2.boo = boo1; + boo2.bar = new Bar(); + boo2.foobar = foobar; + + TestPulsarConnector.Bar bar = new TestPulsarConnector.Bar(); + bar.field1 = fooFunctions.get("bar.field1").apply(count) == null ? null : (int) fooFunctions.get("bar.field1").apply(count); + bar.field2 = fooFunctions.get("bar.field2").apply(count) == null ? null : (String) fooFunctions.get("bar.field2").apply(count); + bar.field3 = (float) fooFunctions.get("bar.field3").apply(count); + bar.test = boo1; + bar.test2 = count % 2 == 0 ? null : boo2; + + Foo foo = new Foo(); + foo.field1 = (int) fooFunctions.get("field1").apply(count); + foo.field2 = (String) fooFunctions.get("field2").apply(count); + foo.field3 = (float) fooFunctions.get("field3").apply(count); + foo.field4 = (double) fooFunctions.get("field4").apply(count); + foo.field5 = (boolean) fooFunctions.get("field5").apply(count); + foo.field6 = (long) fooFunctions.get("field6").apply(count); + foo.timestamp = (long) fooFunctions.get("timestamp").apply(count); + foo.time = (int) fooFunctions.get("time").apply(count); + foo.date = (int) fooFunctions.get("date").apply(count); + foo.bar = bar; + + PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder() + .setProducerName("test-producer").setSequenceId(positions.get(topic)) + .setPublishTime(System.currentTimeMillis()).build(); + + Schema schema = topicsToSchemas.get(schemaName).getType() == SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class); + + org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload + = org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo)); + + ByteBuf byteBuf = serializeMetadataAndPayload + (Commands.ChecksumType.Crc32c, messageMetadata, payload); + + completedBytes += byteBuf.readableBytes(); + + entries.add(EntryImpl.create(0, positions.get(topic), byteBuf)); + positions.put(topic, positions.get(topic) + 1); + count++; + } + + callback.readEntriesComplete(entries, ctx); + } + }).start(); - List<Entry> entries = new LinkedList<>(); - for (int i = 0; i < readEntries; i++) { - - Foo.Bar foobar = new Foo.Bar(); - foobar.field1 = (int) fooFunctions.get("bar.test.foobar.field1").apply(count); - - Boo boo1 = new Boo(); - boo1.field4 = (double) fooFunctions.get("bar.test.field4").apply(count); - boo1.field5 = (boolean) fooFunctions.get("bar.test.field5").apply(count); - boo1.field6 = (long) fooFunctions.get("bar.test.field6").apply(count); - boo1.foo = new Foo(); - boo1.boo = null; - boo1.bar = new Bar(); - boo1.foobar = foobar; - - Boo boo2 = new Boo(); - boo2.field4 = (double) fooFunctions.get("bar.test2.field4").apply(count); - boo2.field5 = (boolean) fooFunctions.get("bar.test2.field5").apply(count); - boo2.field6 = (long) fooFunctions.get("bar.test2.field6").apply(count); - boo2.foo = new Foo(); - boo2.boo = boo1; - boo2.bar = new Bar(); - boo2.foobar = foobar; - - TestPulsarConnector.Bar bar = new TestPulsarConnector.Bar(); - bar.field1 = fooFunctions.get("bar.field1").apply(count) == null ? null : (int) fooFunctions.get("bar.field1").apply(count); - bar.field2 = fooFunctions.get("bar.field2").apply(count) == null ? null : (String) fooFunctions.get("bar.field2").apply(count); - bar.field3 = (float) fooFunctions.get("bar.field3").apply(count); - bar.test = boo1; - bar.test2 = count % 2 == 0 ? null : boo2; - - Foo foo = new Foo(); - foo.field1 = (int) fooFunctions.get("field1").apply(count); - foo.field2 = (String) fooFunctions.get("field2").apply(count); - foo.field3 = (float) fooFunctions.get("field3").apply(count); - foo.field4 = (double) fooFunctions.get("field4").apply(count); - foo.field5 = (boolean) fooFunctions.get("field5").apply(count); - foo.field6 = (long) fooFunctions.get("field6").apply(count); - foo.timestamp = (long) fooFunctions.get("timestamp").apply(count); - foo.time = (int) fooFunctions.get("time").apply(count); - foo.date = (int) fooFunctions.get("date").apply(count); - foo.bar = bar; - - PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder() - .setProducerName("test-producer").setSequenceId(positions.get(topic)) - .setPublishTime(System.currentTimeMillis()).build(); - - Schema schema = topicsToSchemas.get(schemaName).getType() == SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class); - - org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload - = org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo)); - - ByteBuf byteBuf = serializeMetadataAndPayload - (Commands.ChecksumType.Crc32c, messageMetadata, payload); - - completedBytes += byteBuf.readableBytes(); - - entries.add(EntryImpl.create(0, positions.get(topic), byteBuf)); - positions.put(topic, positions.get(topic) + 1); - count++; - } - - return entries; + return null; } - }); + }).when(readOnlyCursor).asyncReadEntries(anyInt(), any(), any()); when(readOnlyCursor.hasMoreEntries()).thenAnswer(new Answer<Boolean>() { @Override diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java index 1b323a2..b0fc42a 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java @@ -122,6 +122,7 @@ public class TestPulsarRecordCursor extends TestPulsarConnector { Assert.assertEquals(count, topicsToNumEntries.get(topicName.getSchemaName()).longValue()); Assert.assertEquals(pulsarRecordCursor.getCompletedBytes(), completedBytes); cleanup(); + pulsarRecordCursor.close(); } } }