To utilize multiple VCORES, multiple threads would be required. Why not
increase the parallelism (partition count)?

Thanks,
Thomas


On Tue, May 1, 2018 at 8:29 PM, Vlad Rozov <[email protected]> wrote:

> I am not sure I understand your assumptions regarding VCORES and CPU usage
> and/or number of tuples emitted per second. Operators and ports in Apex
> have high thread affinity, so with a single operator deployed into a
> container, it does not matter how many VCORES that container has.
>
> Thank you,
>
> Vlad
>
>
> On 5/1/18 11:57, Vivek Bhide wrote:
>
>> Hi,
>>
>> I have created a new operator which converts avro message to json message
>> by
>> extending base operator. I'm unable to use AvroToPojoOperator due to the
>> complexity of avro structure. I'm trying to analyze its performance.
>>
>> When I use:
>>
>> VCORES             MEMORY                  Tuples Emitted MA          CPU%
>> 1                        2048                             3100
>> 98%
>> 1                        4096                             3200
>> 98%
>> 2                        2048                             3100
>> 98%
>> 2                        4096                             3200
>> 98%
>>
>> *All the above Emitted, CPU% are approximates based on simple tests.
>>
>>  From the above analysis, increase in number of cores or memory didn't
>> affect
>> CPU or tuples emitted. My assumption was increasing VCORES will actually
>> reduce the CPU or may be increase emitted tuples. One of my assumption is
>> that, the operator is not actually using 2GB of memory as its a CPU
>> intense
>> operation, so I get that increase of memory really doesn't affect tuples
>> emitted. But, increase in VCORES should reduce the CPU% and increase
>> tuples
>> emitted as now more cores are available for processing.
>>
>> Is my above assumption wrong? If so, what can be done to reduce CPU% a
>> part
>> from partitioning the operator? Below is my code for custom operator.
>>
>>
>> public class CustomAvroToStringOperator extends BaseOperator {
>>
>>      @NotNull
>>      private String schemaRegistryUrl;
>>
>>      private transient KafkaAvroDeserializer avroDeserializer;
>>
>>      public String getSchemaRegistryUrl() {
>>          return schemaRegistryUrl;
>>      }
>>
>>      public void setSchemaRegistryUrl(String schemaRegistryUrl) {
>>          this.schemaRegistryUrl = schemaRegistryUrl;
>>      }
>>
>>      public final transient DefaultOutputPort<String> outputPort = new
>> DefaultOutputPort<>();
>>
>>      public final transient DefaultInputPort<byte[]> inputPort= new
>> DefaultInputPort<byte[]>() {
>>          @Override
>>          public void process(byte[] tuple) {
>>              outputPort.emit(avroDeserializer.deserialize("topic",
>> tuple).toString());
>>          }
>>      };
>>
>>      @Override
>>      public void setup(Context.OperatorContext context) {
>>          avroDeserializer = setUpSchemaRegistry();
>>          super.setup(context);
>>      }
>>
>>      private KafkaAvroDeserializer setUpSchemaRegistry() {
>>          final Map<String, String> config = new HashMap<>();
>>          config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_
>> CONFIG,
>> schemaRegistryUrl);
>>          config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_
>> READER_CONFIG,
>> "false");
>>          return new KafkaAvroDeserializer(new
>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000), config);
>>      }
>> }
>>
>> Regards
>> Vivek
>>
>>
>>
>> --
>> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>>
>
>

Reply via email to