Re: Kafka Offset Commit
Hi Bharath, Thank you for the detailed explanation. My code does not use samza directly. We use Beam's Samza Runner. And I use Beam's KafkaIO. When I check the internal of Beam KafkaIO they use kafka consumer client and they use Assign mode. So even though Kafka does not assign partitions, Beam code uses kafka consumer groups and reads data from per partition by itself. I also checked that Samza Runner code looks like they run Beam's code. Am I missing something else ? Thanks On Wed, Aug 11, 2021 at 3:37 PM Bharath Kumara Subramanian < codin.mart...@gmail.com> wrote: > Hi Talat, > > It is expected behavior since Samza uses low level Kafka consumer instead > of the high level consumer. As a result, offset management is done by Samza > and doesn't leverage the offset management that Kafka consumer has by > default. > > Additionally, kafka consumer group doesn't apply to samza as well since > Samza manages the assignments of Kafka partitions to tasks and doesn't > leverage Kafka's high level consumer behavior to assign its partition to > different consumers. > > Hope that answers your question. > > Thank you, > Bharath > > On Tue, Aug 10, 2021 at 9:41 PM Talat Uyarer > > wrote: > > > Thank you rayman. But my question is when i check kafka consumer group of > > the job. I dont see any offset movement. I chose to store checkpoints on > > file system. Do you think because of that i dont see my job's consumer > > group does not move offset ? > > > > > > > > On Tue, Aug 10, 2021, 9:32 PM rayman preet wrote: > > > > > Hi Talat, > > > > > > Since in the job.properties the task.checkpoint.factory is set to > > > FileSystemCheckpointManagerFactory > > > and not > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory. > > > That is why its writing checkpoints to the filesystem (with its. > location > > > controlled by task.checkpoint.path). > > > > > > > > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__samza.apache.org_learn_documentation_1.0.0_container_checkpointing.html=DwIBaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=sDaqSy0r4X9x43flCgkiuMeZhtbLCX9uwEMkqxtvQ2g=vlTnWi-4Xxnk52pXrMZTfQekBoDWp66hovL2E_qi-Z8= > > > > > > has details on the configs we need to add to enable checkpointing to > > kafka > > > for a job. > > > > > > thanks > > > > > > > > > On Tue, Aug 10, 2021 at 5:03 PM Talat Uyarer < > > tuya...@paloaltonetworks.com > > > > > > > wrote: > > > > > > > Hi Samza Community, > > > > > > > > This is my first email. Forgive my lack of knowledge about samza. I > am > > > > running a testing job in my environment. I run in local model but > > somehow > > > > my job is processing data however it does not commit offset on Kafka > > > side. > > > > I use an apache beam samza runner. > > > > > > > > My pipeline is simply read from kafka write to GCS bucket. Do you > have > > > any > > > > idea where I should look for debugging this issue? > > > > > > > > This is my job.properties file > > > > > > > > app.runner.class=org.apache.samza.runtime.LocalApplicationRunner > > > > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory > > > > job.coordinator.zk.connect=localhost:2181 > > > > > > > > > > > > > > task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory > > > > job.config.rewriters=env-config > > > > > > > > > > > > > > job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter > > > > job.default.system=filereader > > > > > > > > > > > > > > systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > > > job.container.thread.pool.size=300 > > > > > > > > > > > > > > job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory > > > > > > > > > > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory > > > > task.checkpoint.path=/home/checkpoints > > > > > > > > Thanks for your help in advance. > > > > > > > > Talat > > > > > > > > > > > > > -- > > > thanks > > > rayman > > > > > >
Re: Kafka Offset Commit
Hi Talat, It is expected behavior since Samza uses low level Kafka consumer instead of the high level consumer. As a result, offset management is done by Samza and doesn't leverage the offset management that Kafka consumer has by default. Additionally, kafka consumer group doesn't apply to samza as well since Samza manages the assignments of Kafka partitions to tasks and doesn't leverage Kafka's high level consumer behavior to assign its partition to different consumers. Hope that answers your question. Thank you, Bharath On Tue, Aug 10, 2021 at 9:41 PM Talat Uyarer wrote: > Thank you rayman. But my question is when i check kafka consumer group of > the job. I dont see any offset movement. I chose to store checkpoints on > file system. Do you think because of that i dont see my job's consumer > group does not move offset ? > > > > On Tue, Aug 10, 2021, 9:32 PM rayman preet wrote: > > > Hi Talat, > > > > Since in the job.properties the task.checkpoint.factory is set to > > FileSystemCheckpointManagerFactory > > and not org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory. > > That is why its writing checkpoints to the filesystem (with its. location > > controlled by task.checkpoint.path). > > > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__samza.apache.org_learn_documentation_1.0.0_container_checkpointing.html=DwIBaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=sDaqSy0r4X9x43flCgkiuMeZhtbLCX9uwEMkqxtvQ2g=vlTnWi-4Xxnk52pXrMZTfQekBoDWp66hovL2E_qi-Z8= > > > > has details on the configs we need to add to enable checkpointing to > kafka > > for a job. > > > > thanks > > > > > > On Tue, Aug 10, 2021 at 5:03 PM Talat Uyarer < > tuya...@paloaltonetworks.com > > > > > wrote: > > > > > Hi Samza Community, > > > > > > This is my first email. Forgive my lack of knowledge about samza. I am > > > running a testing job in my environment. I run in local model but > somehow > > > my job is processing data however it does not commit offset on Kafka > > side. > > > I use an apache beam samza runner. > > > > > > My pipeline is simply read from kafka write to GCS bucket. Do you have > > any > > > idea where I should look for debugging this issue? > > > > > > This is my job.properties file > > > > > > app.runner.class=org.apache.samza.runtime.LocalApplicationRunner > > > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory > > > job.coordinator.zk.connect=localhost:2181 > > > > > > > > > task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory > > > job.config.rewriters=env-config > > > > > > > > > job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter > > > job.default.system=filereader > > > > > > > > > systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > > job.container.thread.pool.size=300 > > > > > > > > > job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory > > > > > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory > > > task.checkpoint.path=/home/checkpoints > > > > > > Thanks for your help in advance. > > > > > > Talat > > > > > > > > > -- > > thanks > > rayman > > >
Re: Kafka Offset Commit
Thank you rayman. But my question is when i check kafka consumer group of the job. I dont see any offset movement. I chose to store checkpoints on file system. Do you think because of that i dont see my job's consumer group does not move offset ? On Tue, Aug 10, 2021, 9:32 PM rayman preet wrote: > Hi Talat, > > Since in the job.properties the task.checkpoint.factory is set to > FileSystemCheckpointManagerFactory > and not org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory. > That is why its writing checkpoints to the filesystem (with its. location > controlled by task.checkpoint.path). > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__samza.apache.org_learn_documentation_1.0.0_container_checkpointing.html=DwIBaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=sDaqSy0r4X9x43flCgkiuMeZhtbLCX9uwEMkqxtvQ2g=vlTnWi-4Xxnk52pXrMZTfQekBoDWp66hovL2E_qi-Z8= > > has details on the configs we need to add to enable checkpointing to kafka > for a job. > > thanks > > > On Tue, Aug 10, 2021 at 5:03 PM Talat Uyarer > > wrote: > > > Hi Samza Community, > > > > This is my first email. Forgive my lack of knowledge about samza. I am > > running a testing job in my environment. I run in local model but somehow > > my job is processing data however it does not commit offset on Kafka > side. > > I use an apache beam samza runner. > > > > My pipeline is simply read from kafka write to GCS bucket. Do you have > any > > idea where I should look for debugging this issue? > > > > This is my job.properties file > > > > app.runner.class=org.apache.samza.runtime.LocalApplicationRunner > > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory > > job.coordinator.zk.connect=localhost:2181 > > > > > task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory > > job.config.rewriters=env-config > > > > > job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter > > job.default.system=filereader > > > > > systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > job.container.thread.pool.size=300 > > > > > job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory > > task.checkpoint.path=/home/checkpoints > > > > Thanks for your help in advance. > > > > Talat > > > > > -- > thanks > rayman >
Re: Kafka Offset Commit
Hi Talat, Since in the job.properties the task.checkpoint.factory is set to FileSystemCheckpointManagerFactory and not org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory. That is why its writing checkpoints to the filesystem (with its. location controlled by task.checkpoint.path). https://samza.apache.org/learn/documentation/1.0.0/container/checkpointing.html has details on the configs we need to add to enable checkpointing to kafka for a job. thanks On Tue, Aug 10, 2021 at 5:03 PM Talat Uyarer wrote: > Hi Samza Community, > > This is my first email. Forgive my lack of knowledge about samza. I am > running a testing job in my environment. I run in local model but somehow > my job is processing data however it does not commit offset on Kafka side. > I use an apache beam samza runner. > > My pipeline is simply read from kafka write to GCS bucket. Do you have any > idea where I should look for debugging this issue? > > This is my job.properties file > > app.runner.class=org.apache.samza.runtime.LocalApplicationRunner > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory > job.coordinator.zk.connect=localhost:2181 > > task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory > job.config.rewriters=env-config > > job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter > job.default.system=filereader > > systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > job.container.thread.pool.size=300 > > job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory > > task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory > task.checkpoint.path=/home/checkpoints > > Thanks for your help in advance. > > Talat > -- thanks rayman