[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-10-14 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r334362812
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
 ##
 @@ -1289,16 +1364,15 @@ private void batchMessageAndSend() {
 }
 if (!batchMessageContainer.isEmpty()) {
 try {
+List opSendMsgs;
 if (batchMessageContainer.isMultiBatches()) {
-List opSendMsgs = 
batchMessageContainer.createOpSendMsgs();
-for (OpSendMsg opSendMsg : opSendMsgs) {
-processOpSendMsg(opSendMsg);
-}
+opSendMsgs = batchMessageContainer.createOpSendMsgs();
 } else {
-OpSendMsg opSendMsg = 
batchMessageContainer.createOpSendMsg();
-if (opSendMsg != null) {
-processOpSendMsg(opSendMsg);
-}
+opSendMsgs = 
Collections.singletonList(batchMessageContainer.createOpSendMsg());
 
 Review comment:
   It's ok to leave it as it is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-10-14 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r334361724
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
 ##
 @@ -73,6 +81,14 @@ public MessageId send(T message) throws 
PulsarClientException {
 return new TypedMessageBuilderImpl<>(this, schema);
 }
 
+// TODO: same parameterized type `T` with producer for the moment,
 
 Review comment:
   Should I just edit the master issue, with a checklist, or new a issue for 
each TODO?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-10-14 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r334355056
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
 ##
 @@ -1356,6 +1342,58 @@ protected void handleGetSchema(CommandGetSchema 
commandGetSchema) {
 });
 }
 
+@Override
+protected void handleGetOrCreateSchema(CommandGetOrCreateSchema 
commandGetOrCreateSchema) {
+if (log.isDebugEnabled()) {
+log.debug("Received CommandGetOrCreateSchema call from {}", 
remoteAddress);
+}
+long requestId = commandGetOrCreateSchema.getRequestId();
+String topicName = commandGetOrCreateSchema.getTopic();
+SchemaData schemaData = 
getSchema(commandGetOrCreateSchema.getSchema());
+SchemaData schema = schemaData.getType() == SchemaType.NONE ? null : 
schemaData;
+service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
+if (topicOpt.isPresent()) {
+Topic topic = topicOpt.get();
+CompletableFuture schemaVersionFuture = 
tryAddSchema(topic, schema);
+schemaVersionFuture.exceptionally(ex -> {
+ServerError errorCode = 
BrokerServiceException.getClientErrorCode(ex);
+
ctx.writeAndFlush(Commands.newGetOrCreateSchemaResponseError(
+requestId, errorCode, ex.getMessage()));
+return null;
+}).thenAccept(schemaVersion -> {
+
ctx.writeAndFlush(Commands.newGetOrCreateSchemaResponse(
+requestId, schemaVersion));
+});
+} else {
+ctx.writeAndFlush(Commands.newGetOrCreateSchemaResponseError(
+requestId, ServerError.TopicNotFound, "Topic not 
found"));
+}
+}).exceptionally(ex -> {
+ServerError errorCode = 
BrokerServiceException.getClientErrorCode(ex);
+ctx.writeAndFlush(Commands.newGetOrCreateSchemaResponseError(
+requestId, errorCode, ex.getMessage()));
+return null;
+});
+}
+
+private CompletableFuture tryAddSchema(Topic topic, 
SchemaData schema) {
+if (schema != null) {
+return topic.addSchema(schema);
+} else {
+return topic.hasSchema().thenCompose((hasSchema) -> {
+log.info("[{}] {} configured with schema {}",
 
 Review comment:
   `producer_id` not carried in `CommandGetOrCreateSchema`, actually need it? 
   Or a `-1` as placeholder?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-10-14 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r334352968
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
 ##
 @@ -1312,10 +1386,16 @@ private void batchMessageAndSend() {
 
 private void processOpSendMsg(OpSendMsg op) {
 try {
-batchMessageContainer.clear();
+if (op.msg != null && isBatchMessagingEnabled()) {
 
 Review comment:
   `op.msg` means a single non-batch message. we should flush the current batch 
if needed.
   `batchMessageContainer.clear();` was moved into `batchMessageAndSend`.
   Then `processOpSendMsg` not only for batch message.
   
   Don't we need to do the flush, to keep the order or something else?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-21 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r326855318
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
 ##
 @@ -430,6 +447,99 @@ public void sendAsync(Message message, SendCallback 
callback) {
 }
 }
 
+private boolean fillMessageSchema(MessageMetadata.Builder 
msgMetadataBuilder,
+  Schema msgSchema,
+  SendCallback callback) {
+if (msgSchema == schema) {
+schemaVersion.ifPresent(v -> 
msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(v)));
+return true;
+}
+if (!isMultiSchemaEnabled(true)) {
+callback.sendComplete(new 
PulsarClientException.InvalidMessageException(
+"Multiple schema disabled"));
+return false;
+}
+byte[] schemaVersion;
+try {
+schemaVersion = schemaCache.computeIfAbsent(
+SchemaHash.of(msgSchema), (hash) -> {
+SchemaInfo schemaInfo = Optional.ofNullable(msgSchema)
+
.map(Schema::getSchemaInfo)
+.filter(si -> 
si.getType().getValue() > 0)
+
.orElse(Schema.BYTES.getSchemaInfo());
+try {
+return getOrCreateSchemaAsync(schemaInfo).get();
 
 Review comment:
   First try pushed, PTAL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-18 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r325947509
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
 ##
 @@ -430,6 +447,99 @@ public void sendAsync(Message message, SendCallback 
callback) {
 }
 }
 
+private boolean fillMessageSchema(MessageMetadata.Builder 
msgMetadataBuilder,
+  Schema msgSchema,
+  SendCallback callback) {
+if (msgSchema == schema) {
+schemaVersion.ifPresent(v -> 
msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(v)));
+return true;
+}
+if (!isMultiSchemaEnabled(true)) {
+callback.sendComplete(new 
PulsarClientException.InvalidMessageException(
+"Multiple schema disabled"));
+return false;
+}
+byte[] schemaVersion;
+try {
+schemaVersion = schemaCache.computeIfAbsent(
+SchemaHash.of(msgSchema), (hash) -> {
+SchemaInfo schemaInfo = Optional.ofNullable(msgSchema)
+
.map(Schema::getSchemaInfo)
+.filter(si -> 
si.getType().getValue() > 0)
+
.orElse(Schema.BYTES.getSchemaInfo());
+try {
+return getOrCreateSchemaAsync(schemaInfo).get();
 
 Review comment:
   Yeah, agree with you on executing command asynchronously. It's a better 
implementation, I will get it out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-17 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r325024160
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
 ##
 @@ -434,4 +434,18 @@
  * @return the producer builder instance
  */
 ProducerBuilder autoUpdatePartitions(boolean autoUpdate);
+
+/**
+ * Option to control whether enable multiple schema mode,
+ * which would be enabled automatically if this feature is touched by 
default.
+ * If enabled, producer can send a message with different schema from that 
specified just when it is created,
+ * otherwise a invalid message exception would be threw if the producer 
want to send a message
+ * with different schema.
+ *
+ * @param switchOption
+ *indicates to enable, disable, or automatically enable 
multiple schema mode
+ * @return the producer builder instance
+ * @since 2.5.0
+ */
+ProducerBuilder withMultiSchema(SwitchOption switchOption);
 
 Review comment:
   Outdated for signature changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-17 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r325015192
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaHash.java
 ##
 @@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import lombok.EqualsAndHashCode;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import java.util.Optional;
+
+@EqualsAndHashCode
+public class SchemaHash {
 
 Review comment:
   A TODO is commented.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-17 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r325015007
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
 ##
 @@ -185,6 +206,155 @@ public void newProducerWithoutSchemaOnTopicWithSchema() 
throws Exception {
 }
 }
 
+@Test
+public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() 
throws Exception {
+String topic = "my-property/my-ns/schema-test";
+
+try (Producer ignored = 
pulsarClient.newProducer(Schema.AVRO(V1Data.class))
+.topic(topic).create()) {
+}
+try (Producer p = 
pulsarClient.newProducer(Schema.AVRO(V2Data.class))
+  .topic(topic).create()) {
+p.send(new V2Data(-1, -1));
+}
+V1Data msgV1 = new V1Data(2);
+V2Data msgV2 = new V2Data(3, 5);
+byte[] contentV1 = new AvroWriter(
+
ReflectData.AllowNull.get().getSchema(V1Data.class)).write(msgV1);
+byte[] contentV2 = new AvroWriter(
+
ReflectData.AllowNull.get().getSchema(V2Data.class)).write(msgV2);
+try (Producer p = 
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+  .topic(topic).create();
+ Consumer c = 
pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
+  .topic(topic)
+  
.subscriptionName("sub1").subscribe()) {
+Assert.expectThrows(SchemaSerializationException.class, () -> 
p.send(contentV1));
+
+
((ProducerBase)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
+ .value(contentV1).send();
+p.send(contentV2);
+V2Data msg1 = c.receive().getValue();
+Assert.assertEquals(msgV1.i, msg1.i);
+Assert.assertNull(msg1.j);
+Assert.assertEquals(msgV2, c.receive().getValue());
 
 Review comment:
   More assert checks added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-17 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r325014856
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerMultiSchemaMode.java
 ##
 @@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Multiple schema mode for producer,
+ * indicates to enable, disable, or automatically enable multiple schema.
+ */
+public enum ProducerMultiSchemaMode {
+/**
+ * Automatically enable multiple schema for producer if needed.
+ */
+Auto,
+
+/**
+ * Enable multiple schema mode for producer, with this mode producer can 
send a message
+ * with different schema from that specified just when it is created.
+ */
+Enabled,
 
 Review comment:
   @codelipenghui it's fixed, PTAL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-17 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r325014623
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 ##
 @@ -257,6 +258,12 @@ private ProducerBuilderImpl(PulsarClientImpl client, 
ProducerConfigurationData c
 return this;
 }
 
+@Override
+public ProducerBuilder withMultiSchemaMode(ProducerMultiSchemaMode 
producerMultiSchemaMode) {
+conf.setMultiSchemaMode(producerMultiSchemaMode);
 
 Review comment:
   Need not to fix for signature changed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-16 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r324983119
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerMultiSchemaMode.java
 ##
 @@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Multiple schema mode for producer,
+ * indicates to enable, disable, or automatically enable multiple schema.
+ */
+public enum ProducerMultiSchemaMode {
+/**
+ * Automatically enable multiple schema for producer if needed.
+ */
+Auto,
+
+/**
+ * Enable multiple schema mode for producer, with this mode producer can 
send a message
+ * with different schema from that specified just when it is created.
+ */
+Enabled,
 
 Review comment:
   It's great to separate these two parts. I'll fix it. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-16 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r324964473
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerMultiSchemaMode.java
 ##
 @@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Multiple schema mode for producer,
+ * indicates to enable, disable, or automatically enable multiple schema.
+ */
+public enum ProducerMultiSchemaMode {
+/**
+ * Automatically enable multiple schema for producer if needed.
+ */
+Auto,
+
+/**
+ * Enable multiple schema mode for producer, with this mode producer can 
send a message
+ * with different schema from that specified just when it is created.
+ */
+Enabled,
 
 Review comment:
   Enabled mode would lead to more check in some path. Auto means to discard 
those checks and only enable it when need.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-16 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r324964075
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaHash.java
 ##
 @@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import lombok.EqualsAndHashCode;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import java.util.Optional;
+
+@EqualsAndHashCode
+public class SchemaHash {
 
 Review comment:
   Sure,  it's good to do it in this PR. I originally intended to do this in 
following future PRs to make this one as smaller as possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-14 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r324424636
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
 ##
 @@ -107,10 +107,19 @@
  */
 TypedMessageBuilder newMessage();
 
+/**
+ * Create a new message builder with schema specified explicitly,
+ *
+ * @return a typed message builder that can be used to construct the 
message to be sent through this producer
+ * @since 2.5.0
+ * @see #newMessage()
+ */
+TypedMessageBuilder newMessage(Schema schema);
 
 Review comment:
   I have hidden this interface and left a TODO.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-14 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r324424596
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
 ##
 @@ -434,4 +434,18 @@
  * @return the producer builder instance
  */
 ProducerBuilder autoUpdatePartitions(boolean autoUpdate);
+
+/**
+ * Option to control whether enable multiple schema mode,
+ * which would be enabled automatically if this feature is touched by 
default.
+ * If enabled, producer can send a message with different schema from that 
specified just when it is created,
+ * otherwise a invalid message exception would be threw if the producer 
want to send a message
+ * with different schema.
+ *
+ * @param switchOption
+ *indicates to enable, disable, or automatically enable 
multiple schema mode
+ * @return the producer builder instance
+ * @since 2.5.0
+ */
+ProducerBuilder withMultiSchema(SwitchOption switchOption);
 
 Review comment:
   fixed, PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-13 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r324405477
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
 ##
 @@ -107,10 +107,19 @@
  */
 TypedMessageBuilder newMessage();
 
+/**
+ * Create a new message builder with schema specified explicitly,
+ *
+ * @return a typed message builder that can be used to construct the 
message to be sent through this producer
+ * @since 2.5.0
+ * @see #newMessage()
+ */
+TypedMessageBuilder newMessage(Schema schema);
 
 Review comment:
   I'll leave a TODO, or just hide this interface for still not complete


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-13 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r324405477
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
 ##
 @@ -107,10 +107,19 @@
  */
 TypedMessageBuilder newMessage();
 
+/**
+ * Create a new message builder with schema specified explicitly,
+ *
+ * @return a typed message builder that can be used to construct the 
message to be sent through this producer
+ * @since 2.5.0
+ * @see #newMessage()
+ */
+TypedMessageBuilder newMessage(Schema schema);
 
 Review comment:
   I'll leave a TODO, or just hidden this interface for still not complete


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-13 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r324405110
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
 ##
 @@ -434,4 +434,18 @@
  * @return the producer builder instance
  */
 ProducerBuilder autoUpdatePartitions(boolean autoUpdate);
+
+/**
+ * Option to control whether enable multiple schema mode,
+ * which would be enabled automatically if this feature is touched by 
default.
+ * If enabled, producer can send a message with different schema from that 
specified just when it is created,
+ * otherwise a invalid message exception would be threw if the producer 
want to send a message
+ * with different schema.
+ *
+ * @param switchOption
+ *indicates to enable, disable, or automatically enable 
multiple schema mode
+ * @return the producer builder instance
+ * @since 2.5.0
+ */
+ProducerBuilder withMultiSchema(SwitchOption switchOption);
 
 Review comment:
   Great, I meant a generic name. It's good, I'll fix it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-13 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r324404970
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
 ##
 @@ -107,10 +107,19 @@
  */
 TypedMessageBuilder newMessage();
 
+/**
+ * Create a new message builder with schema specified explicitly,
+ *
+ * @return a typed message builder that can be used to construct the 
message to be sent through this producer
+ * @since 2.5.0
+ * @see #newMessage()
+ */
+TypedMessageBuilder newMessage(Schema schema);
 
 Review comment:
   @sijie yeah, you are right. I would change the signature in the following PR 
to make this one easier to review. It would break some more inner method of 
implementation class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

2019-09-11 Thread GitBox
yittg commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r323301079
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
 ##
 @@ -430,6 +438,95 @@ public void sendAsync(Message message, SendCallback 
callback) {
 }
 }
 
+private boolean fillMessageSchema(MessageMetadata.Builder 
msgMetadataBuilder,
+  Schema msgSchema,
+  SendCallback callback) {
+if (msgSchema == schema) {
+schemaVersion.ifPresent(v -> 
msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(v)));
+return true;
+}
+byte[] schemaVersion;
+try {
+schemaVersion = schemaCache.computeIfAbsent(
+SchemaHash.of(msgSchema), (hash) -> {
+SchemaInfo schemaInfo = Optional.ofNullable(msgSchema)
+
.map(Schema::getSchemaInfo)
+.filter(si -> 
si.getType().getValue() > 0)
+
.orElse(Schema.BYTES.getSchemaInfo());
+try {
+return getOrCreateSchemaAsync(schemaInfo).get();
+} catch (Throwable t) {
+log.warn("[{}][{}] GetOrCreateSchema error", 
topic, producerName, t);
+
callback.sendComplete(PulsarClientException.unwrap(t));
+throw new RuntimeException(t);
+}
+});
+} catch (RuntimeException e) {
+return false;
+}
+conf.setMultiSchemaEnabled(true);
 
 Review comment:
   @congbobo184 thanks for your reply. Here enable multiple schema 
automatically, it works on some path like checking whether messages can be 
added to a same batch: it is not needed to check whether they have same schema 
if multiple schema not enabled.
   i'll make it more clear in following commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services