To utilize multiple VCORES, multiple threads would be required. Why not increase the parallelism (partition count)?
Thanks, Thomas On Tue, May 1, 2018 at 8:29 PM, Vlad Rozov <[email protected]> wrote: > I am not sure I understand your assumptions regarding VCORES and CPU usage > and/or number of tuples emitted per second. Operators and ports in Apex > have high thread affinity, so with a single operator deployed into a > container, it does not matter how many VCORES that container has. > > Thank you, > > Vlad > > > On 5/1/18 11:57, Vivek Bhide wrote: > >> Hi, >> >> I have created a new operator which converts avro message to json message >> by >> extending base operator. I'm unable to use AvroToPojoOperator due to the >> complexity of avro structure. I'm trying to analyze its performance. >> >> When I use: >> >> VCORES MEMORY Tuples Emitted MA CPU% >> 1 2048 3100 >> 98% >> 1 4096 3200 >> 98% >> 2 2048 3100 >> 98% >> 2 4096 3200 >> 98% >> >> *All the above Emitted, CPU% are approximates based on simple tests. >> >> From the above analysis, increase in number of cores or memory didn't >> affect >> CPU or tuples emitted. My assumption was increasing VCORES will actually >> reduce the CPU or may be increase emitted tuples. One of my assumption is >> that, the operator is not actually using 2GB of memory as its a CPU >> intense >> operation, so I get that increase of memory really doesn't affect tuples >> emitted. But, increase in VCORES should reduce the CPU% and increase >> tuples >> emitted as now more cores are available for processing. >> >> Is my above assumption wrong? If so, what can be done to reduce CPU% a >> part >> from partitioning the operator? Below is my code for custom operator. >> >> >> public class CustomAvroToStringOperator extends BaseOperator { >> >> @NotNull >> private String schemaRegistryUrl; >> >> private transient KafkaAvroDeserializer avroDeserializer; >> >> public String getSchemaRegistryUrl() { >> return schemaRegistryUrl; >> } >> >> public void setSchemaRegistryUrl(String schemaRegistryUrl) { >> this.schemaRegistryUrl = schemaRegistryUrl; >> } >> >> public final transient DefaultOutputPort<String> outputPort = new >> DefaultOutputPort<>(); >> >> public final transient DefaultInputPort<byte[]> inputPort= new >> DefaultInputPort<byte[]>() { >> @Override >> public void process(byte[] tuple) { >> outputPort.emit(avroDeserializer.deserialize("topic", >> tuple).toString()); >> } >> }; >> >> @Override >> public void setup(Context.OperatorContext context) { >> avroDeserializer = setUpSchemaRegistry(); >> super.setup(context); >> } >> >> private KafkaAvroDeserializer setUpSchemaRegistry() { >> final Map<String, String> config = new HashMap<>(); >> config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_ >> CONFIG, >> schemaRegistryUrl); >> config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_ >> READER_CONFIG, >> "false"); >> return new KafkaAvroDeserializer(new >> CachedSchemaRegistryClient(schemaRegistryUrl, 1000), config); >> } >> } >> >> Regards >> Vivek >> >> >> >> -- >> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/ >> > >
