I am not sure I understand your assumptions regarding VCORES and CPU usage and/or number of tuples emitted per second. Operators and ports in Apex have high thread affinity, so with a single operator deployed into a container, it does not matter how many VCORES that container has.

Thank you,

Vlad

On 5/1/18 11:57, Vivek Bhide wrote:
Hi,

I have created a new operator which converts avro message to json message by
extending base operator. I'm unable to use AvroToPojoOperator due to the
complexity of avro structure. I'm trying to analyze its performance.

When I use:

VCORES             MEMORY                  Tuples Emitted MA          CPU%
1                        2048                             3100
98%
1                        4096                             3200
98%
2                        2048                             3100
98%
2                        4096                             3200
98%

*All the above Emitted, CPU% are approximates based on simple tests.

 From the above analysis, increase in number of cores or memory didn't affect
CPU or tuples emitted. My assumption was increasing VCORES will actually
reduce the CPU or may be increase emitted tuples. One of my assumption is
that, the operator is not actually using 2GB of memory as its a CPU intense
operation, so I get that increase of memory really doesn't affect tuples
emitted. But, increase in VCORES should reduce the CPU% and increase tuples
emitted as now more cores are available for processing.

Is my above assumption wrong? If so, what can be done to reduce CPU% a part
from partitioning the operator? Below is my code for custom operator.


public class CustomAvroToStringOperator extends BaseOperator {

     @NotNull
     private String schemaRegistryUrl;

     private transient KafkaAvroDeserializer avroDeserializer;

     public String getSchemaRegistryUrl() {
         return schemaRegistryUrl;
     }

     public void setSchemaRegistryUrl(String schemaRegistryUrl) {
         this.schemaRegistryUrl = schemaRegistryUrl;
     }

     public final transient DefaultOutputPort<String> outputPort = new
DefaultOutputPort<>();

     public final transient DefaultInputPort<byte[]> inputPort= new
DefaultInputPort<byte[]>() {
         @Override
         public void process(byte[] tuple) {
             outputPort.emit(avroDeserializer.deserialize("topic",
tuple).toString());
         }
     };

     @Override
     public void setup(Context.OperatorContext context) {
         avroDeserializer = setUpSchemaRegistry();
         super.setup(context);
     }

     private KafkaAvroDeserializer setUpSchemaRegistry() {
         final Map<String, String> config = new HashMap<>();
         config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
schemaRegistryUrl);
         config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
"false");
         return new KafkaAvroDeserializer(new
CachedSchemaRegistryClient(schemaRegistryUrl, 1000), config);
     }
}

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Reply via email to