Re: Running v1.7.0 locally
Hey, Malcolm, Thanks for reporting this issue. Could you open a JIRA to track that? Best! -Yi On Mon, Aug 29, 2022 at 5:53 PM Malcolm McFarland wrote: > Hey folks, > > I've recently been attempting to upgrade our legacy application from Samza > 1.5.1 to 1.7.0. With version 1.5.1, I've had no problems running the > application with this command: > > ./bin/run-app.sh --config-path=path/to/file.properties > > Starting in 1.6.0, this doesn't seem to work. As far as I can tell, the > application is starting fully up without errors and then is simply shutting > down, once again without error. Afaict it runs fine on YARN. Does Samza > v1.6.0+ support running local processes? I've tried this on both OS X and > Ubuntu, using Java 1.8. > > Here are the relevant portions of the properties file: > > task.class=com.cavulus.task.SimpleLegacyTask > job.factory.class=org.apache.samza.job.local.ThreadJobFactory > job.default.system=kafka > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > job.name=simple-legacy-task > task.inputs=kafka.event-input > > ...plus serdes, ZooKeeper configuration, etc, etc. Here are the last few > lines of logging output: > > 2022-08-29 17:19:42,842 DEBUG [org.apache.kafka.clients.NetworkClient] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Sending metadata request > (type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null) > 2022-08-29 17:19:42,843 INFO [org.apache.kafka.clients.Metadata] > Cluster ID: fwnjhL2kQayFxN0xpatT-g > 2022-08-29 17:19:42,843 DEBUG [org.apache.kafka.clients.Metadata] > Updated cluster metadata version 2 to Cluster(id = fwnjhL2kQayFxN0xpatT-g, > nodes = [localhost:9092 (id: 0 rack: null)], partitions = [], controller = > localhost:9092 (id: 0 rack: null)) > 2022-08-29 17:19:42,843 DEBUG > [org.apache.samza.system.kafka.KafkaSystemAdmin] Stream > simple-legacy-task-broadcast-stream has partitions [Partition(topic = > simple-legacy-task-broadcast-stream, partition = 0, leader = 0, replicas = > [0], isr = [0], offlineReplicas = [])] > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Initiating connection to node localhost:9092 > (id: 0 rack: null) > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] > Added sensor with name node-0.bytes-sent > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] > Added sensor with name node-0.bytes-received > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] > Added sensor with name node-0.latency > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.network.Selector] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Created socket with SO_RCVBUF = 342972, > SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0 > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Completed connection to node 0. Fetching API > versions. > 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Initiating API versions fetch from node 0. > 2022-08-29 17:19:42,845 DEBUG [org.apache.kafka.clients.NetworkClient] > [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, > groupId=simple-legacy-task-1] Recorded API versions for node 0: > (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 11 [usable: 8], > ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 8 [usable: 6], > LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], > UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 > [usable: 1], OffsetCommit(8): 0 to 7 [usable: 4], OffsetFetch(9): 0 to 5 > [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 5 > [usable: 3], Heartbeat(12): 0 to 3 [usable: 2], LeaveGroup(13): 0 to 2 > [usable: 2], SyncGroup(14): 0 to 3 [usable: 2], DescribeGroups(15): 0 to 3 > [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 > [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 > [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to > 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], > OffsetForLeaderEpoch(23): 0 to 3 [usable: 1], AddPartitionsToTxn(24): 0 to > 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 > [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to > 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 > [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to > 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], > AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
Running v1.7.0 locally
Hey folks, I've recently been attempting to upgrade our legacy application from Samza 1.5.1 to 1.7.0. With version 1.5.1, I've had no problems running the application with this command: ./bin/run-app.sh --config-path=path/to/file.properties Starting in 1.6.0, this doesn't seem to work. As far as I can tell, the application is starting fully up without errors and then is simply shutting down, once again without error. Afaict it runs fine on YARN. Does Samza v1.6.0+ support running local processes? I've tried this on both OS X and Ubuntu, using Java 1.8. Here are the relevant portions of the properties file: task.class=com.cavulus.task.SimpleLegacyTask job.factory.class=org.apache.samza.job.local.ThreadJobFactory job.default.system=kafka systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory job.name=simple-legacy-task task.inputs=kafka.event-input ...plus serdes, ZooKeeper configuration, etc, etc. Here are the last few lines of logging output: 2022-08-29 17:19:42,842 DEBUG [org.apache.kafka.clients.NetworkClient] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null) 2022-08-29 17:19:42,843 INFO [org.apache.kafka.clients.Metadata] Cluster ID: fwnjhL2kQayFxN0xpatT-g 2022-08-29 17:19:42,843 DEBUG [org.apache.kafka.clients.Metadata] Updated cluster metadata version 2 to Cluster(id = fwnjhL2kQayFxN0xpatT-g, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [], controller = localhost:9092 (id: 0 rack: null)) 2022-08-29 17:19:42,843 DEBUG [org.apache.samza.system.kafka.KafkaSystemAdmin] Stream simple-legacy-task-broadcast-stream has partitions [Partition(topic = simple-legacy-task-broadcast-stream, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])] 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Initiating connection to node localhost:9092 (id: 0 rack: null) 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-sent 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-received 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.latency 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.network.Selector] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Created socket with SO_RCVBUF = 342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Completed connection to node 0. Fetching API versions. 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Initiating API versions fetch from node 0. 2022-08-29 17:19:42,845 DEBUG [org.apache.kafka.clients.NetworkClient] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Recorded API versions for node 0: (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 11 [usable: 8], ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 8 [usable: 6], LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 [usable: 1], OffsetCommit(8): 0 to 7 [usable: 4], OffsetFetch(9): 0 to 5 [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 5 [usable: 3], Heartbeat(12): 0 to 3 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 3 [usable: 2], DescribeGroups(15): 0 to 3 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 3 [usable: 1], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1],