Re: integrate Camus and Hive?
e.g File produce by the camus job: /user/[hive.user]/output/ *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/* Bhavesh, how do you get Camus to write into a directory hierarchy like this? Is it reading the partition values from your messages' timestamps? On Mar 11, 2015, at 11:29, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Yang, We do this today camus to hive (without the Avro) just plain old tab separated log line. We use the hive -f command to add dynamic partition to hive table: Bash Shell Scripts add time buckets into HIVE table before camus job runs: for partition in ${@//\//,}; do echo ALTER TABLE ${env:TABLE_NAME} ADD IF NOT EXISTS PARTITION ($partition); done | hive -f e.g File produce by the camus job: /user/[hive.user]/output/ *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/* Above will add hive dynamic partition before camus job runs. It works, and you can have any schema: CREATE EXTERNAL TABLE IF NOT EXISTS ${env:TABLE_NAME} ( SOME Table FIELDS... ) PARTITIONED BY ( partition_month_utc STRING, partition_day_utc STRING, partition_minute_bucket STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS SEQUENCEFILE LOCATION '${env:TABLE_LOCATION_CAMUS_OUTPUT}' ; I hope this will help ! You will have to construct hive query according to partition define. Thanks, Bhavesh On Wed, Mar 11, 2015 at 7:24 AM, Andrew Otto ao...@wikimedia.org wrote: Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. I tried this yesterday, and as far as I can tell it doesn’t work with a custom partition layout. At least not with external tables. MSCK REPAIR TABLE reports that there are directories in the table’s location that are not partitions of the table, but it wouldn’t actually add the partition unless the directory layout matched Hive’s default (key1=value1/key2=value2, etc.) On Mar 9, 2015, at 17:16, Pradeep Gollakota pradeep...@gmail.com wrote: If I understood your question correctly, you want to be able to read the output of Camus in Hive and be able to know partition values. If my understanding is right, you can do so by using the following. Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. Take a look at this SO http://stackoverflow.com/questions/24289571/hive-0-13-external-table-dynamic-partitioning-custom-pattern Does that help? On Mon, Mar 9, 2015 at 1:42 PM, Yang tedd...@gmail.com wrote: I believe many users like us would export the output from camus as a hive external table. but the dir structure of camus is like //MM/DD/xx while hive generally expects /year=/month=MM/day=DD/xx if you define that table to be partitioned by (year, month, day). otherwise you'd have to add those partitions created by camus through a separate command. but in the latter case, would a camus job create 1 partitions ? how would we find out the /MM/DD values from outside ? well you could always do something by hadoop dfs -ls and then grep the output, but it's kind of not clean thanks yang
Re: Does consumer support combination of whitelist and blacklist topic filtering
Tao, In MM people can pass in consumer configs, in which people can specify consumption topics, either in regular topic list format or whitelist / blacklist. So I think it already does what you need? Guozhang On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com wrote: Thank you guys for answering. I think it will be good that we can pass in a customised topicCount ( I think this is the interface whitelist and backlist implement if I am not mistaken) to MM to achieve similar thing On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Unfortunately MM does not support whitelist / blacklist at the same time, and you have to choose either one upon initialization. As for your case, I think it can be captured by some reg-ex to exclude nothing else but 10, but I do not know the exact expression. Guozhang On Tue, Mar 10, 2015 at 7:58 AM, tao xiao xiaotao...@gmail.com javascript:; wrote: I actually mean if we can achieve this in mirror maker. On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com javascript:; wrote: Hi, I have an user case where I need to consume a list topics with name that matches pattern topic.* except for one that is topic.10. Is there a way that I can combine the use of whitelist and blacklist so that I can achieve something like accept all topics with regex topic.* but exclude topic.10? -- Regards, Tao -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang
Re: createMessageStreams vs createMessageStreamsByFilter
Hi James, What I meant before is that a single fetcher may be responsible for putting fetched data to multiple queues according to the construction of the streams setup, where each queue may be consumed by a different thread. And the queues are actually bounded. Now say if there are two queues that are getting data from the same fetcher F, and are consumed by two different user threads A and B. If thread A for some reason got slowed / hung consuming data from queue 1, then queue 1 will eventually get full, and F trying to put more data to it will be blocked. Since F is parked on trying to put data to queue 1, queue 2 will not get more data from it, and thread B may hence gets starved. Does that make sense now? Guozhang On Tue, Mar 10, 2015 at 5:15 PM, James Cheng jch...@tivo.com wrote: Hi, Sorry to bring up this old thread, but my question is about this exact thing: Guozhang, you said: A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. You said that in the createMessageStreamsByFilter case, if topic AC had no messages in it and consumer.timeout.ms = -1, then the 3 threads might all be blocked waiting for data to arrive from topic AC, and so messages from BC would not be processed. createMessageStreamsByFilter(*C = 1) (single stream) would have the same problem but just worse. Behind the scenes, is there a single thread that is consuming (round-robin?) messages from the different partitions and inserting them all into a single queue for the application code to process? And that is why a single partition with no messages with block the other messages from getting through? What about createMessageStreams(AC = 1)? That creates a single stream that contains messages from multiple partitions, which might be on different brokers. Does that also suffer the same problem, where if one partition has no messages, that the application would not receive messages from the other paritions? Thanks, -James On Feb 11, 2015, at 8:13 AM, Guozhang Wang wangg...@gmail.com wrote: The new consumer will be released in 0.9, which is targeted for end of this quarter. On Tue, Feb 10, 2015 at 7:11 PM, tao xiao xiaotao...@gmail.com wrote: Do you know when the new consumer API will be publicly available? On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com wrote: Yes, it can get stuck. For example, AC and BC are processed by two different processes and AC processors gets stuck, hence AC messages will fill up in the consumer's buffer and eventually prevents the fetcher thread to put more data into it; the fetcher thread will be blocked on that and not be able to fetch BC. This issue has been addressed in the new consumer client, which is single-threaded with non-blocking APIs. Guozhang On Tue, Feb 10, 2015 at 6:24 PM, tao xiao xiaotao...@gmail.com wrote: Thank you Guozhang for your detailed explanation. In your example createMessageStreamsByFilter(*C = 3) since threads are shared among topics there may be situation where all 3 threads threads get stuck with topic AC e.g. topic is empty which will be holding the connecting threads (setting consumer.timeout.ms=-1) hence there is no thread to serve topic BC. do you think this situation will happen? On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote: I was not clear before .. for createMessageStreamsByFilter each matched topic will have num-threads, but shared: i.e. there will be totally num-threads created, but each thread will be responsible for fetching all matched topics. A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. Guozhang On Tue, Feb 10, 2015 at 8:37 AM, tao xiao xiaotao...@gmail.com wrote: Guozhang, Do you mean that each regex matched topic owns number of threads that get passed in to createMessageStreamsByFilter ? For example in below code If I have 3 matched topics each of which has 2 partitions then I should have 3 * 2 = 6 threads in total with each topic owning 2 threads. TopicFilter filter = new Whitelist(.*); int threadTotal = 2; ListKafkaStreambyte[], byte[] streams = connector .createMessageStreamsByFilter(filter, threadTotal); But what I observed from the
Re: High Level Consumer Example in 0.8.2
That example still works, the high level consumer interface hasn't changed. There is a new high level consumer on the way and an initial version has been checked into trunk, but it won't be ready to use until 0.9. On Wed, Mar 11, 2015 at 9:05 AM, ankit tyagi ankittyagi.mn...@gmail.com wrote: Hi All, we are upgrading our kafka client version from 0.8.0 to 0.8.2. Is there any document for High level kafka consumer withMultiple thread like https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example for this newer version. -- Thanks, Ewen
Re: Idle/dead producer connections on broker
Hmm, this sounds like a serious bug. I do remember we have some ticket reporting similar issues before but I cannot find it now. Let me dig a bit deeper later. BTW, could you try out the 0.8.2 broker version and see if this is still easily re-producible, i.e. starting a bunch of producers to send data for a while, and terminate them? Guozhang On Tue, Mar 10, 2015 at 1:00 PM, Allen Wang aw...@netflix.com.invalid wrote: Hello, We are using Kafka 0.8.1.1 on the broker and 0.8.2 producer on the client. After running for a few days, we have found that there are way too many open file descriptors on the broker side. When we compare the connections on the client side, we found some connections are already gone on the client but still exists on the broker. Also there are connections on the broker where the producer instances are already terminated. We then did a netstat -o and found that the connections on the broker side does not have keep-alive enabled (as timewait is off): tcp6 0 0 kafka-xyz:7101 ip-a-b-c-d:33471 ESTABLISHED off (0.00/0/0) We suspect that because there is no keep-alive on the broker, there is no probing on the idle connections and therefore no connection clean up. There is a default 2 hours TCP keep alive set on the OS level on both sides: net.ipv4.tcp_keepalive_time = 7200 On the producer side, keepalive is enabled on the connection: tcp6 0 0 ip-a-b-c-d:33471kafka-xyz.:7101 ESTABLISHED keepalive (975.50/0/0) Is there anyway to clean up the idle producer connections on the broker side? Does keepalive helps cleaning up the idle connections? Thanks, Allen -- -- Guozhang
Re: integrate Camus and Hive?
Hi Ad You have to implement custom partitioner and also you will have to create what ever path (based on message eg log line timestamp, or however you choose to create directory hierarchy from your each message). You will need to implement your own Partitioner class implementation: https://github.com/linkedin/camus/blob/master/camus-api/src/main/java/com/linkedin/camus/etl/Partitioner.java and use configuration etl.partitioner.class=CLASSNAME then you can organize any way you like. I hope this helps. Thanks, Bhavesh On Wed, Mar 11, 2015 at 8:36 AM, Andrew Otto ao...@wikimedia.org wrote: e.g File produce by the camus job: /user/[hive.user]/output/ *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/* Bhavesh, how do you get Camus to write into a directory hierarchy like this? Is it reading the partition values from your messages' timestamps? On Mar 11, 2015, at 11:29, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Yang, We do this today camus to hive (without the Avro) just plain old tab separated log line. We use the hive -f command to add dynamic partition to hive table: Bash Shell Scripts add time buckets into HIVE table before camus job runs: for partition in ${@//\//,}; do echo ALTER TABLE ${env:TABLE_NAME} ADD IF NOT EXISTS PARTITION ($partition); done | hive -f e.g File produce by the camus job: /user/[hive.user]/output/ *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/* Above will add hive dynamic partition before camus job runs. It works, and you can have any schema: CREATE EXTERNAL TABLE IF NOT EXISTS ${env:TABLE_NAME} ( SOME Table FIELDS... ) PARTITIONED BY ( partition_month_utc STRING, partition_day_utc STRING, partition_minute_bucket STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS SEQUENCEFILE LOCATION '${env:TABLE_LOCATION_CAMUS_OUTPUT}' ; I hope this will help ! You will have to construct hive query according to partition define. Thanks, Bhavesh On Wed, Mar 11, 2015 at 7:24 AM, Andrew Otto ao...@wikimedia.org wrote: Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. I tried this yesterday, and as far as I can tell it doesn’t work with a custom partition layout. At least not with external tables. MSCK REPAIR TABLE reports that there are directories in the table’s location that are not partitions of the table, but it wouldn’t actually add the partition unless the directory layout matched Hive’s default (key1=value1/key2=value2, etc.) On Mar 9, 2015, at 17:16, Pradeep Gollakota pradeep...@gmail.com wrote: If I understood your question correctly, you want to be able to read the output of Camus in Hive and be able to know partition values. If my understanding is right, you can do so by using the following. Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. Take a look at this SO http://stackoverflow.com/questions/24289571/hive-0-13-external-table-dynamic-partitioning-custom-pattern Does that help? On Mon, Mar 9, 2015 at 1:42 PM, Yang tedd...@gmail.com wrote: I believe many users like us would export the output from camus as a hive external table. but the dir structure of camus is like //MM/DD/xx while hive generally expects /year=/month=MM/day=DD/xx if you define that table to be partitioned by (year, month, day). otherwise you'd have to add those partitions created by camus through a separate command. but in the latter case, would a camus job create 1 partitions ? how would we find out the /MM/DD values from outside ? well you could always do something by hadoop dfs -ls and then grep the output, but it's kind of not clean thanks yang
Re: integrate Camus and Hive?
Hi Andrew, I would say camus is generic enough (but you can propose this to Camus Team). Here is sample code and methods that you can use to create any path or directory structure and create a corresponding (Hive Table schema for it). public class UTCLogPartitioner extends Partitioner { @Override public String *encodePartition*(JobContext context, IEtlKey key) { long outfilePartitionMs = EtlMultiOutputFormat.getEtlOutputFileTimePartitionMins(context) * 6L; return +DateUtils.getPartition(outfilePartitionMs, *key.getTime()*); } @Override public String *generatePartitionedPath*(JobContext context, String topic, String brokerId, int partitionId, String *encodedPartition*) { StringBuilder sb = new StringBuilder(); sb.append(Create your HDFS custom path here); return sb.toString(); } } I Thanks, Bhavesh On Wed, Mar 11, 2015 at 10:42 AM, Andrew Otto ao...@wikimedia.org wrote: Thanks, Do you have this partitioner implemented? Perhaps it would be good to try to get this into Camus as a build in option. HivePartitioner? :) -Ao On Mar 11, 2015, at 13:11, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Ad You have to implement custom partitioner and also you will have to create what ever path (based on message eg log line timestamp, or however you choose to create directory hierarchy from your each message). You will need to implement your own Partitioner class implementation: https://github.com/linkedin/camus/blob/master/camus-api/src/main/java/com/linkedin/camus/etl/Partitioner.java and use configuration etl.partitioner.class=CLASSNAME then you can organize any way you like. I hope this helps. Thanks, Bhavesh On Wed, Mar 11, 2015 at 8:36 AM, Andrew Otto ao...@wikimedia.org wrote: e.g File produce by the camus job: /user/[hive.user]/output/ *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/* Bhavesh, how do you get Camus to write into a directory hierarchy like this? Is it reading the partition values from your messages' timestamps? On Mar 11, 2015, at 11:29, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Yang, We do this today camus to hive (without the Avro) just plain old tab separated log line. We use the hive -f command to add dynamic partition to hive table: Bash Shell Scripts add time buckets into HIVE table before camus job runs: for partition in ${@//\//,}; do echo ALTER TABLE ${env:TABLE_NAME} ADD IF NOT EXISTS PARTITION ($partition); done | hive -f e.g File produce by the camus job: /user/[hive.user]/output/ *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/* Above will add hive dynamic partition before camus job runs. It works, and you can have any schema: CREATE EXTERNAL TABLE IF NOT EXISTS ${env:TABLE_NAME} ( SOME Table FIELDS... ) PARTITIONED BY ( partition_month_utc STRING, partition_day_utc STRING, partition_minute_bucket STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS SEQUENCEFILE LOCATION '${env:TABLE_LOCATION_CAMUS_OUTPUT}' ; I hope this will help ! You will have to construct hive query according to partition define. Thanks, Bhavesh On Wed, Mar 11, 2015 at 7:24 AM, Andrew Otto ao...@wikimedia.org wrote: Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. I tried this yesterday, and as far as I can tell it doesn’t work with a custom partition layout. At least not with external tables. MSCK REPAIR TABLE reports that there are directories in the table’s location that are not partitions of the table, but it wouldn’t actually add the partition unless the directory layout matched Hive’s default (key1=value1/key2=value2, etc.) On Mar 9, 2015, at 17:16, Pradeep Gollakota pradeep...@gmail.com wrote: If I understood your question correctly, you want to be able to read the output of Camus in Hive and be able to know partition values. If my understanding is right, you can do so by using the following. Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. Take a look at this SO http://stackoverflow.com/questions/24289571/hive-0-13-external-table-dynamic-partitioning-custom-pattern Does that help? On Mon, Mar 9, 2015 at 1:42 PM, Yang tedd...@gmail.com wrote: I believe many users like us would export the output from camus as a hive external table. but the dir structure of camus is like //MM/DD/xx while hive generally expects
Examples of kafka based architectures?
Hi all In December Adrian Cockcroft presented some big names distributed architectures in his talk State of the Art in Microservices at dockercon. For each he put tooling/configuration/discovery/routing/observability on top and then under datastores, orchestration and development. One can see some example in the slides 51 and later there http://fr.slideshare.net/adriancockcroft/dockercon-state-of-the-art-in-microservices . Regarding kafka, I fell like some similar schemes would be very welcome, either in general or in the case of linkedin (or other big names using it). Obviously, at least IMHO, routing is tackled, but all the rest is of interest, since there are interdependencies. On top it would provide some examples of where to go for newcomers like me. So if ever some of you are able to present such schemes, well, please do. Really. Best Joseph
[ANNOUNCEMENT] Apache Kafka 0.8.2.1 Released
The Apache Kafka community is pleased to announce the release for Apache Kafka 0.8.2.1. The 0.8.2.1 release fixes 4 critical issues in 0.8.2.0. All of the changes in this release can be found: https://archive.apache.org/dist/kafka/0.8.2.1/RELEASE_NOTES.html Apache Kafka is high-throughput, publish-subscribe messaging system rethought of as a distributed commit log. ** Fast = A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. ** Scalable = Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers. ** Durable = Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact. ** Distributed by Design = Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees. You can download the release from: http://kafka.apache.org/downloads.html We welcome your help and feedback. For more information on how to report problems, and to get involved, visit the project website at http://kafka.apache.org/ Thanks, Jun
Re: integrate Camus and Hive?
Thanks, Do you have this partitioner implemented? Perhaps it would be good to try to get this into Camus as a build in option. HivePartitioner? :) -Ao On Mar 11, 2015, at 13:11, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Ad You have to implement custom partitioner and also you will have to create what ever path (based on message eg log line timestamp, or however you choose to create directory hierarchy from your each message). You will need to implement your own Partitioner class implementation: https://github.com/linkedin/camus/blob/master/camus-api/src/main/java/com/linkedin/camus/etl/Partitioner.java and use configuration etl.partitioner.class=CLASSNAME then you can organize any way you like. I hope this helps. Thanks, Bhavesh On Wed, Mar 11, 2015 at 8:36 AM, Andrew Otto ao...@wikimedia.org wrote: e.g File produce by the camus job: /user/[hive.user]/output/ *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/* Bhavesh, how do you get Camus to write into a directory hierarchy like this? Is it reading the partition values from your messages' timestamps? On Mar 11, 2015, at 11:29, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Yang, We do this today camus to hive (without the Avro) just plain old tab separated log line. We use the hive -f command to add dynamic partition to hive table: Bash Shell Scripts add time buckets into HIVE table before camus job runs: for partition in ${@//\//,}; do echo ALTER TABLE ${env:TABLE_NAME} ADD IF NOT EXISTS PARTITION ($partition); done | hive -f e.g File produce by the camus job: /user/[hive.user]/output/ *partition_month_utc=2015-03/partition_day_utc=2015-03-11/partition_minute_bucket=2015-03-11-02-09/* Above will add hive dynamic partition before camus job runs. It works, and you can have any schema: CREATE EXTERNAL TABLE IF NOT EXISTS ${env:TABLE_NAME} ( SOME Table FIELDS... ) PARTITIONED BY ( partition_month_utc STRING, partition_day_utc STRING, partition_minute_bucket STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS SEQUENCEFILE LOCATION '${env:TABLE_LOCATION_CAMUS_OUTPUT}' ; I hope this will help ! You will have to construct hive query according to partition define. Thanks, Bhavesh On Wed, Mar 11, 2015 at 7:24 AM, Andrew Otto ao...@wikimedia.org wrote: Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. I tried this yesterday, and as far as I can tell it doesn’t work with a custom partition layout. At least not with external tables. MSCK REPAIR TABLE reports that there are directories in the table’s location that are not partitions of the table, but it wouldn’t actually add the partition unless the directory layout matched Hive’s default (key1=value1/key2=value2, etc.) On Mar 9, 2015, at 17:16, Pradeep Gollakota pradeep...@gmail.com wrote: If I understood your question correctly, you want to be able to read the output of Camus in Hive and be able to know partition values. If my understanding is right, you can do so by using the following. Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. Take a look at this SO http://stackoverflow.com/questions/24289571/hive-0-13-external-table-dynamic-partitioning-custom-pattern Does that help? On Mon, Mar 9, 2015 at 1:42 PM, Yang tedd...@gmail.com wrote: I believe many users like us would export the output from camus as a hive external table. but the dir structure of camus is like //MM/DD/xx while hive generally expects /year=/month=MM/day=DD/xx if you define that table to be partitioned by (year, month, day). otherwise you'd have to add those partitions created by camus through a separate command. but in the latter case, would a camus job create 1 partitions ? how would we find out the /MM/DD values from outside ? well you could always do something by hadoop dfs -ls and then grep the output, but it's kind of not clean thanks yang
Re: createMessageStreams vs createMessageStreamsByFilter
Fetcher thread is per broker basis, it ensures that at lease one fetcher thread per broker. Fetcher thread is sent to broker with a fetch request to ask for all partitions. So if A, B, C are in the same broker fetcher thread is still able to fetch data from A, B, C even though A returns no data. same logic is applied to different broker. On Thu, Mar 12, 2015 at 6:25 AM, James Cheng jch...@tivo.com wrote: On Mar 11, 2015, at 9:12 AM, Guozhang Wang wangg...@gmail.com wrote: Hi James, What I meant before is that a single fetcher may be responsible for putting fetched data to multiple queues according to the construction of the streams setup, where each queue may be consumed by a different thread. And the queues are actually bounded. Now say if there are two queues that are getting data from the same fetcher F, and are consumed by two different user threads A and B. If thread A for some reason got slowed / hung consuming data from queue 1, then queue 1 will eventually get full, and F trying to put more data to it will be blocked. Since F is parked on trying to put data to queue 1, queue 2 will not get more data from it, and thread B may hence gets starved. Does that make sense now? Yes, that makes sense. That is the scenario where one thread of a consumer can cause a backup in the queue, which would cause other threads to not receive data. What about the situation I described, where a thread consumes a queue that is supposed to be filled with messages from multiple partitions? If partition A has no messages and partitions B and C do, how will the fetcher behave? Will the processing thread receive messages from partitions B and C? Thanks, -James Guozhang On Tue, Mar 10, 2015 at 5:15 PM, James Cheng jch...@tivo.com wrote: Hi, Sorry to bring up this old thread, but my question is about this exact thing: Guozhang, you said: A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. You said that in the createMessageStreamsByFilter case, if topic AC had no messages in it and consumer.timeout.ms = -1, then the 3 threads might all be blocked waiting for data to arrive from topic AC, and so messages from BC would not be processed. createMessageStreamsByFilter(*C = 1) (single stream) would have the same problem but just worse. Behind the scenes, is there a single thread that is consuming (round-robin?) messages from the different partitions and inserting them all into a single queue for the application code to process? And that is why a single partition with no messages with block the other messages from getting through? What about createMessageStreams(AC = 1)? That creates a single stream that contains messages from multiple partitions, which might be on different brokers. Does that also suffer the same problem, where if one partition has no messages, that the application would not receive messages from the other paritions? Thanks, -James On Feb 11, 2015, at 8:13 AM, Guozhang Wang wangg...@gmail.com wrote: The new consumer will be released in 0.9, which is targeted for end of this quarter. On Tue, Feb 10, 2015 at 7:11 PM, tao xiao xiaotao...@gmail.com wrote: Do you know when the new consumer API will be publicly available? On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com wrote: Yes, it can get stuck. For example, AC and BC are processed by two different processes and AC processors gets stuck, hence AC messages will fill up in the consumer's buffer and eventually prevents the fetcher thread to put more data into it; the fetcher thread will be blocked on that and not be able to fetch BC. This issue has been addressed in the new consumer client, which is single-threaded with non-blocking APIs. Guozhang On Tue, Feb 10, 2015 at 6:24 PM, tao xiao xiaotao...@gmail.com wrote: Thank you Guozhang for your detailed explanation. In your example createMessageStreamsByFilter(*C = 3) since threads are shared among topics there may be situation where all 3 threads threads get stuck with topic AC e.g. topic is empty which will be holding the connecting threads (setting consumer.timeout.ms=-1) hence there is no thread to serve topic BC. do you think this situation will happen? On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote: I was not clear before .. for createMessageStreamsByFilter each matched topic will have num-threads, but shared: i.e. there will be totally num-threads created, but each thread will be responsible for fetching all
Out of Disk Space - Infinite loop
We have 3 DC's and created 5 node Kafka cluster in each DC, connected these 3 DC's using Mirror Maker for replication. We were conducting performance testing using Kafka Producer Performance tool to load 100 million rows into 7 topics. We expected that data will be loaded evenly across 7 topics but 4 topics got loaded with ~2 million messages and remaining 3 topics loaded with 90 million messages. The nodes that were leaders of those 3 topics ran out of disk space and nodes went down. We tried to bring back these 2 nodes by doing following 1. Stopped Kafka Service 2. Deleted couple of topics that were taking up too much space i.e. /var/kafka/logs/{topic$}/ and file system showed 47% available 3. Brought back the Kafka nodes As soon as nodes are back, we started observing the file system growing and in 15 minutes the mount point became full again. Deleted topics got recreated and taking up space again. Looking at kafka.log, it shows many of the following messages. Ultimately the node goes down. We don't need to recover data now, we would like to bring nodes back. What are the steps to bring back these nodes? [2015-03-11 20:52:36,323] INFO Rolled new log segment for 'dc2-perf-topic5-0' in 3 ms. (kafka.log.Log) [2015-03-11 15:58:07,321] INFO [Kafka Server 1021124614], started (kafka.server.KafkaServer) [2015-03-11 15:58:07,882] INFO Completed load of log dc2-perf-topic5-0 with log end offset 0 (kafka.log.Log) [2015-03-11 15:58:07,900] INFO Created log for partition [dc2-perf-topic5,0] in /var/kafka/log with properties {segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes - 1073741824, flush.ms - 9223372036854775807, delete.retention.ms - 360, index.interval.bytes - 4096, retention.bytes - -1, cleanup.policy - delete, segment.ms - 60480, max.message.bytes - 112, flush.messages - 9223372036854775807, min.cleanable.dirty.ratio - 0.5, retention.ms - 60480}. (kafka.log.LogManager) [2015-03-11 15:58:07,914] INFO Completed load of log dc2-perf-topic2-0 with log end offset 0 (kafka.log.Log) [2015-03-11 15:58:07,916] INFO Created log for partition [dc2-perf-topic2,0] in /var/kafka/log with properties {segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes - 1073741824, flush.ms - 9223372036854775807, delete.retention.ms - 360, index.interval.bytes - 4096, retention.bytes - -1, cleanup.policy - delete, segment.ms - 60480, max.message.bytes - 112, flush.messages - 9223372036854775807, min.cleanable.dirty.ratio - 0.5, retention.ms - 60480}. (kafka.log.LogManager) [2015-03-11 15:58:07,935] INFO Completed load of log dc2-perf-topic9-0 with log end offset 0 (kafka.log.Log) SP Naidu
Re: createMessageStreams vs createMessageStreamsByFilter
On Mar 11, 2015, at 9:12 AM, Guozhang Wang wangg...@gmail.com wrote: Hi James, What I meant before is that a single fetcher may be responsible for putting fetched data to multiple queues according to the construction of the streams setup, where each queue may be consumed by a different thread. And the queues are actually bounded. Now say if there are two queues that are getting data from the same fetcher F, and are consumed by two different user threads A and B. If thread A for some reason got slowed / hung consuming data from queue 1, then queue 1 will eventually get full, and F trying to put more data to it will be blocked. Since F is parked on trying to put data to queue 1, queue 2 will not get more data from it, and thread B may hence gets starved. Does that make sense now? Yes, that makes sense. That is the scenario where one thread of a consumer can cause a backup in the queue, which would cause other threads to not receive data. What about the situation I described, where a thread consumes a queue that is supposed to be filled with messages from multiple partitions? If partition A has no messages and partitions B and C do, how will the fetcher behave? Will the processing thread receive messages from partitions B and C? Thanks, -James Guozhang On Tue, Mar 10, 2015 at 5:15 PM, James Cheng jch...@tivo.com wrote: Hi, Sorry to bring up this old thread, but my question is about this exact thing: Guozhang, you said: A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. You said that in the createMessageStreamsByFilter case, if topic AC had no messages in it and consumer.timeout.ms = -1, then the 3 threads might all be blocked waiting for data to arrive from topic AC, and so messages from BC would not be processed. createMessageStreamsByFilter(*C = 1) (single stream) would have the same problem but just worse. Behind the scenes, is there a single thread that is consuming (round-robin?) messages from the different partitions and inserting them all into a single queue for the application code to process? And that is why a single partition with no messages with block the other messages from getting through? What about createMessageStreams(AC = 1)? That creates a single stream that contains messages from multiple partitions, which might be on different brokers. Does that also suffer the same problem, where if one partition has no messages, that the application would not receive messages from the other paritions? Thanks, -James On Feb 11, 2015, at 8:13 AM, Guozhang Wang wangg...@gmail.com wrote: The new consumer will be released in 0.9, which is targeted for end of this quarter. On Tue, Feb 10, 2015 at 7:11 PM, tao xiao xiaotao...@gmail.com wrote: Do you know when the new consumer API will be publicly available? On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com wrote: Yes, it can get stuck. For example, AC and BC are processed by two different processes and AC processors gets stuck, hence AC messages will fill up in the consumer's buffer and eventually prevents the fetcher thread to put more data into it; the fetcher thread will be blocked on that and not be able to fetch BC. This issue has been addressed in the new consumer client, which is single-threaded with non-blocking APIs. Guozhang On Tue, Feb 10, 2015 at 6:24 PM, tao xiao xiaotao...@gmail.com wrote: Thank you Guozhang for your detailed explanation. In your example createMessageStreamsByFilter(*C = 3) since threads are shared among topics there may be situation where all 3 threads threads get stuck with topic AC e.g. topic is empty which will be holding the connecting threads (setting consumer.timeout.ms=-1) hence there is no thread to serve topic BC. do you think this situation will happen? On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote: I was not clear before .. for createMessageStreamsByFilter each matched topic will have num-threads, but shared: i.e. there will be totally num-threads created, but each thread will be responsible for fetching all matched topics. A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. Guozhang On Tue, Feb 10, 2015 at 8:37 AM, tao xiao xiaotao...@gmail.com wrote:
Re: How replicas catch up the leader
Hi, @Jiangjie Qin this is the related info from controller.log: [2015-03-11 10:54:11,962] ERROR [Controller 0]: Error completing reassignment of partition [ad_click_sts,3] (kafka.controller.KafkaController) kafka.common.KafkaException: Partition [ad_click_sts,3] to be reassigned is already assigned to replicas 0,1. Ignoring request for partition reassignment at kafka.controller.KafkaController.initiateReassignReplicasForTopicPartition(KafkaController.scala:585) It seems like kafka ignore the kafka-reassign-partitions.sh command. the json file used in command is : {version:1,partitions:[{topic:ad_click_sts,partition:3,replicas:[0,1]}]} The partition has lost sync replication in practice : Topic: ad_click_sts Partition: 3Leader: 0 Replicas: 0,1 Isr: 0 Regards sy.pan 在 2015年3月11日,12:32,Jiangjie Qin j...@linkedin.com.INVALID 写道: It looks that in your case it is because broker 1 somehow missed a controller LeaderAndIsrRequest for [ad_click_sts,4]. So the zkVersion would be different from the value stored in zookeeper from that on. Therefore broker 1 failed to update ISR. In this case you have to bounce broker to fix it. From what you posted, it looks both broker 0 and broker 1 are having this issue. So the question is how could both broker missed a controller LeaderAndIsrRequest. Is there anything interesting in controller.log? Jiangjie (Becket) Qin On 3/10/15, 8:33 PM, sy.pan shengyi@gmail.com mailto:shengyi@gmail.com wrote: @tao xiao and Jiangjie Qin, Thank you very much I try to run kafka-reassign-partitions.sh, but the issue still exists… this the log info: [2015-03-11 11:00:40,086] ERROR Conditional update of path /brokers/topics/ad_click_sts/partitions/4/state with data {controller_epoch:23,leader:1,version:1,leader_epoch:35,isr:[1,0 ]} and expected version 564 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/ad_click_sts/partitions/4/state (kafka.utils.ZkUtils$) [2015-03-11 11:00:40,086] INFO Partition [ad_click_sts,4] on broker 1: Cached zkVersion [564] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) finally, I had to restart the kafka node and the Isr problem is fixed, is there any better ways? Regards sy.pan 在 2015年3月11日,03:34,Jiangjie Qin j...@linkedin.com.INVALID 写道: This looks like a leader broker somehow did not respond to a fetch request from the follower. It may be because the broker was too busy. If that is the case, Xiao¹s approach could help - reassign partitions or reelect leaders to balance the traffic among brokers. Jiangjie (Becket) Qin On 3/9/15, 8:31 PM, sy.pan shengyi@gmail.com mailto:shengyi@gmail.com mailto:shengyi@gmail.com wrote: Hi, tao xiao and Jiangjie Qin I encounter with the same issue, my node had recovered from high load problem (caused by other application) this is the kafka-topic show: Topic:ad_click_sts PartitionCount:6ReplicationFactor:2 Configs: Topic: ad_click_sts Partition: 0Leader: 1 Replicas: 1,0 Isr: 1 Topic: ad_click_sts Partition: 1Leader: 0 Replicas: 0,1 Isr: 0 Topic: ad_click_sts Partition: 2Leader: 1 Replicas: 1,0 Isr: 1 Topic: ad_click_sts Partition: 3Leader: 0 Replicas: 0,1 Isr: 0 Topic: ad_click_sts Partition: 4Leader: 1 Replicas: 1,0 Isr: 1 Topic: ad_click_sts Partition: 5Leader: 0 Replicas: 0,1 Isr: 0 ReplicaFetcherThread info extracted from kafka server.log : [2015-03-09 21:06:05,450] ERROR [ReplicaFetcherThread-0-0], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 7331; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [ad_click_sts,5] - PartitionFetchInfo(6149699,1048576),[ad_click_sts,3] - PartitionFetchInfo(6147835,1048576),[ad_click_sts,1] - PartitionFetchInfo(6235071,1048576) (kafka.server.ReplicaFetcherThread) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) ŠŠ.. at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsum er .scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scal a: 108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scal a: 108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh re ad.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88 ) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Re: Database Replication Question
Hi, Pete, Thank you for sharing your experience with me! sendfile and mmap are common system calls, but it sounds like we still need to consider at least the file-system differences when deploying Kafka. Cross-platform supports are a headache. : ) Best wishes, Xiao Li On Mar 10, 2015, at 10:28 AM, Pete Wright pwri...@rubiconproject.com wrote: On 03/09/15 22:56, Xiao wrote: Hi, Jay, Thank you! The Kafka document shows “Kafka should run well on any unix system. I assume it includes the major two Unix versions, IBM AIX and HP-UX. Right? FWIW I have had good success running Kafka under load on FreeBSD. I was using OpenJDK-8, and ZFS as my underlying filesystem (which supports lz4 block level compression). FreeBSD supports the sendfile() system call much like Linux does, and if AIX and HP-UX also support this you may be OK. It's been a while since I've used those Unix systms so YMMV there. The one suggestion for AIX would be to use IBM's JDK/JRE as I am sure it has been optimized for Power processors. Obviously this precludes doing extensive load and failover testing before even considering running it in production - but I did not run into any issues using FreeBSD in my lab. Cheers, -pete -- Pete Wright Systems Architect Rubicon Project pwri...@rubiconproject.com 310.309.9298
Re: Database Replication Question
Hi Jiangjie and Guozhang, Native z/OS support is what I need. I wrote a few native z/OS applications using C/C++ before. Based on my current understanding, I have two alternatives: 1) Write native z/OS producers in C/C++ to feed data to Kafka clusters that are running on LUW servers. I assume the mainframe and Kafka clusters are running in the same DC. 2) Through additional component (like IBM MQ) to send data from z/OS to LUW, implement JAVA producers on LUW servers by reading MQ messages to feed Kafka clusters. Do you think they can work? Any potential issue based on your experiences? Before starting implementation of prototypes, I have to understand if Kafka is a right solution for enterprise customers, if we can ignore the security issues in the initial stage. (I already saw the security-related features will be released next month.) Thank you for sharing the background with me, Guozhang! It helps me understand the reason why asynchronous fsync is introduced. However, I still think the original design can be kept. Users can choose a better solution based on their use cases. Replication is good for throughput and performance but it will increase the total operation costs and complicate the design of some Kafka applications. Occasional latency spikes might be ok for some use cases and some use cases do not have very high throughput requirements. Disk persistence is a fundamental requirement for us. : ( Zero data loss is always assumed; otherwise, it requires a very complicated recovery procedure. Cross center replication is not acceptable to most customers. It will reduce both performance and usability. The simplest solution is using dual power supplies in Kafka deployment. I am not sure if this is enough. BTW, the design proposal of “transactional messaging” misses a design change in the Log recovery? Recovery checkpoints might be in the middle of multiple in-flight transactions. Thank you very much! Xiao Li On Mar 10, 2015, at 1:01 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: Hi Xiao, For z/OS, do you mean z/VM or native z/OS? For z/VM, it probably will work fine, but for z/OS, I would be surprised if Kafka can run directly on it. I think Guozhang¹s approach for cross colo replication is worth trying. One thing might need to be aware of for deploying Kafka cluster cross colo is that Kafka is not designed to tolerate network partition - which is more likely to occur when you deploy Kafka cross multiple DC. Another option is to run separate Kafka clusters in different data center and do double writes to two separate clusters. But that definitely needs more work to ensure consistence. Thanks. Jiangjie (Becket) Qin On 3/10/15, 8:27 AM, Guozhang Wang wangg...@gmail.com wrote: Hello Xiao, The proposed transactional messaging at Kafka is aimed at improving the at-least-once semantics of delivery to exactly-once, i.e. to avoid duplicates. It is not aimed for grouping fsync of multiple messages into one, as for avoiding data loss it is still dependent on data replication. As I understands your situation, you are mainly concerned with data center power outage tolerance via disk persistence, which cannot be easily supported via asynchronous time-based fsync. We used to do synchronized fsync before data replication is introduced, and later switched to async fsync for mainly for performance (KAFKA-615 https://issues.apache.org/jira/browse/KAFKA-615), since we see the disk IO spikes can some times cause the whole broker to slow down quite a lot due to processor thread pool sharing. On the other hand, even with fsync, data loss may still happen upon power outage depending on your file system journaling settings. Another alternative to achieve data center failures is geo-replication: you can try to put your replicas in different data centers, more specifically: 1) you can use one producer to send normal data with ack mode = 1 for not waiting cross-colo replication latency. 2) at each batch boundary, you can use another producer to send a commit message with ack mode = -1, which requires all data before this message to be already replicated, to mimic the group committing behavior. This will of course still increase the write latency and require a cross-colo ZK setup. Guozhang On Mon, Mar 9, 2015 at 10:56 PM, Xiao lixiao1...@gmail.com wrote: Hi, Jay, Thank you! The Kafka document shows ³Kafka should run well on any unix system. I assume it includes the major two Unix versions, IBM AIX and HP-UX. Right? 1. Unfortunately, we aims at supporting all the platforms, Linux, Unix, Windows and especially z/OS. I know z/OS is not easy to support. 2. Fsync per message is very expensive and Fsync per batch will break the transaction atomicity. We are looking for transaction-level fsync, which is more efficient. Then, our producers can easily combine multiple small transactions
Consuming messages on the same backend
Hi. I have the following problem : we are building a system that will generate any number of different events published to different topics Events have an associated client and a client can express interest in these events. When they do, then for each event we will execute a callback to a remote (http) endpoint It is important that we so not overload the remote endpoint say after a crash and there are a metric ton of unprocessed events waiting to be processed So i'd like all events for a client to end up on the same cosumer and do the throttling there Is it possible to do this in kafka ? I know that i can specify the client as a key when producing the events so that events for a client for that topic all end up on the same partition but because we have a topic per event type and lots of events then the messages will still be processed by many backends Creating a new topic that contains all the events from all the other topics would work i guess. But this feels inelegant and then you are storing data twice Can anyone offer any guidance?
Re: createMessageStreams vs createMessageStreamsByFilter
consumer.timeout.ms only affects how the stream reads data from the internal chunk queue that is used to buffer received data. The actual data fetching is done by another fetcher thread kafka.consumer.ConsumerFetcherThread. The fetcher thread keeps reading data from broker and put them to the queue and the stream keeps polling the queue and passes data back to consumer if any. So for the case like createMessageStreams(AC = 1) the same stream ( which means the same chunk queue) is shared by multiple partitions of topic AC. If one of the partition has no data the consumer is still able to read data from other partitions as the fetcher thread keeps feeding data from other partitions to the queue. The only situation where consumer will get stuck is when fetcher thread is blocked by network like high network latency between consumer and broker or no data from broker. This is because fetch thread is implemented using block I/O On Wed, Mar 11, 2015 at 8:15 AM, James Cheng jch...@tivo.com wrote: Hi, Sorry to bring up this old thread, but my question is about this exact thing: Guozhang, you said: A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. You said that in the createMessageStreamsByFilter case, if topic AC had no messages in it and consumer.timeout.ms = -1, then the 3 threads might all be blocked waiting for data to arrive from topic AC, and so messages from BC would not be processed. createMessageStreamsByFilter(*C = 1) (single stream) would have the same problem but just worse. Behind the scenes, is there a single thread that is consuming (round-robin?) messages from the different partitions and inserting them all into a single queue for the application code to process? And that is why a single partition with no messages with block the other messages from getting through? What about createMessageStreams(AC = 1)? That creates a single stream that contains messages from multiple partitions, which might be on different brokers. Does that also suffer the same problem, where if one partition has no messages, that the application would not receive messages from the other paritions? Thanks, -James On Feb 11, 2015, at 8:13 AM, Guozhang Wang wangg...@gmail.com wrote: The new consumer will be released in 0.9, which is targeted for end of this quarter. On Tue, Feb 10, 2015 at 7:11 PM, tao xiao xiaotao...@gmail.com wrote: Do you know when the new consumer API will be publicly available? On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com wrote: Yes, it can get stuck. For example, AC and BC are processed by two different processes and AC processors gets stuck, hence AC messages will fill up in the consumer's buffer and eventually prevents the fetcher thread to put more data into it; the fetcher thread will be blocked on that and not be able to fetch BC. This issue has been addressed in the new consumer client, which is single-threaded with non-blocking APIs. Guozhang On Tue, Feb 10, 2015 at 6:24 PM, tao xiao xiaotao...@gmail.com wrote: Thank you Guozhang for your detailed explanation. In your example createMessageStreamsByFilter(*C = 3) since threads are shared among topics there may be situation where all 3 threads threads get stuck with topic AC e.g. topic is empty which will be holding the connecting threads (setting consumer.timeout.ms=-1) hence there is no thread to serve topic BC. do you think this situation will happen? On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote: I was not clear before .. for createMessageStreamsByFilter each matched topic will have num-threads, but shared: i.e. there will be totally num-threads created, but each thread will be responsible for fetching all matched topics. A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. Guozhang On Tue, Feb 10, 2015 at 8:37 AM, tao xiao xiaotao...@gmail.com wrote: Guozhang, Do you mean that each regex matched topic owns number of threads that get passed in to createMessageStreamsByFilter ? For example in below code If I have 3 matched topics each of which has 2 partitions then I should have 3 * 2 = 6 threads in total with each topic owning 2 threads.
Re: integrate Camus and Hive?
Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. I tried this yesterday, and as far as I can tell it doesn’t work with a custom partition layout. At least not with external tables. MSCK REPAIR TABLE reports that there are directories in the table’s location that are not partitions of the table, but it wouldn’t actually add the partition unless the directory layout matched Hive’s default (key1=value1/key2=value2, etc.) On Mar 9, 2015, at 17:16, Pradeep Gollakota pradeep...@gmail.com wrote: If I understood your question correctly, you want to be able to read the output of Camus in Hive and be able to know partition values. If my understanding is right, you can do so by using the following. Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. Take a look at this SO http://stackoverflow.com/questions/24289571/hive-0-13-external-table-dynamic-partitioning-custom-pattern Does that help? On Mon, Mar 9, 2015 at 1:42 PM, Yang tedd...@gmail.com wrote: I believe many users like us would export the output from camus as a hive external table. but the dir structure of camus is like //MM/DD/xx while hive generally expects /year=/month=MM/day=DD/xx if you define that table to be partitioned by (year, month, day). otherwise you'd have to add those partitions created by camus through a separate command. but in the latter case, would a camus job create 1 partitions ? how would we find out the /MM/DD values from outside ? well you could always do something by hadoop dfs -ls and then grep the output, but it's kind of not clean thanks yang
Re: High Level Consumer Example in 0.8.2
Hi Ewen, I am using* kafka-clients-0.8.2.0.jar* client jar and i don't see above interface in that. which dependency do i need to include?? On Wed, Mar 11, 2015 at 10:17 PM, Ewen Cheslack-Postava e...@confluent.io wrote: That example still works, the high level consumer interface hasn't changed. There is a new high level consumer on the way and an initial version has been checked into trunk, but it won't be ready to use until 0.9. On Wed, Mar 11, 2015 at 9:05 AM, ankit tyagi ankittyagi.mn...@gmail.com wrote: Hi All, we are upgrading our kafka client version from 0.8.0 to 0.8.2. Is there any document for High level kafka consumer withMultiple thread like https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example for this newer version. -- Thanks, Ewen
Re: High Level Consumer Example in 0.8.2
Ah, I see the confusion now. The kafka-clients jar was introduced only recently and is meant to hold the new clients, which are pure Java implementations and cleanly isolated from the server code. The old clients (including what we call the old consumer since a new consumer is being developed, but is really the current consumer) are in the core module. For that you want a kafka_scala_version.jar. For example, this artifact should work for you: http://search.maven.org/#artifactdetails|org.apache.kafka|kafka_2.10|0.8.2.1|jar By the way, you probably also want 0.8.2.1 now since it fixed a few important bugs in 0.8.2.0. -Ewen On Wed, Mar 11, 2015 at 10:08 PM, ankit tyagi ankittyagi.mn...@gmail.com wrote: Hi Ewen, I am using* kafka-clients-0.8.2.0.jar* client jar and i don't see above interface in that. which dependency do i need to include?? On Wed, Mar 11, 2015 at 10:17 PM, Ewen Cheslack-Postava e...@confluent.io wrote: That example still works, the high level consumer interface hasn't changed. There is a new high level consumer on the way and an initial version has been checked into trunk, but it won't be ready to use until 0.9. On Wed, Mar 11, 2015 at 9:05 AM, ankit tyagi ankittyagi.mn...@gmail.com wrote: Hi All, we are upgrading our kafka client version from 0.8.0 to 0.8.2. Is there any document for High level kafka consumer withMultiple thread like https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example for this newer version. -- Thanks, Ewen -- Thanks, Ewen
Re: High Level Consumer Example in 0.8.2
Hi Eden, Just One Question, if i want to have implementation for new producer then in that case i need to include below dependencies 1. *kafka-clients-0.8.2.0.jar (for new implementation of producer)* * 2. kafka_2.10-0.8.2.1 jar (for consumer )* As I see package of producer is different in both the jar, so there won't be any conflicts . On Thu, Mar 12, 2015 at 10:51 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Ah, I see the confusion now. The kafka-clients jar was introduced only recently and is meant to hold the new clients, which are pure Java implementations and cleanly isolated from the server code. The old clients (including what we call the old consumer since a new consumer is being developed, but is really the current consumer) are in the core module. For that you want a kafka_scala_version.jar. For example, this artifact should work for you: http://search.maven.org/#artifactdetails|org.apache.kafka|kafka_2.10|0.8.2.1|jar By the way, you probably also want 0.8.2.1 now since it fixed a few important bugs in 0.8.2.0. -Ewen On Wed, Mar 11, 2015 at 10:08 PM, ankit tyagi ankittyagi.mn...@gmail.com wrote: Hi Ewen, I am using* kafka-clients-0.8.2.0.jar* client jar and i don't see above interface in that. which dependency do i need to include?? On Wed, Mar 11, 2015 at 10:17 PM, Ewen Cheslack-Postava e...@confluent.io wrote: That example still works, the high level consumer interface hasn't changed. There is a new high level consumer on the way and an initial version has been checked into trunk, but it won't be ready to use until 0.9. On Wed, Mar 11, 2015 at 9:05 AM, ankit tyagi ankittyagi.mn...@gmail.com wrote: Hi All, we are upgrading our kafka client version from 0.8.0 to 0.8.2. Is there any document for High level kafka consumer withMultiple thread like https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example for this newer version. -- Thanks, Ewen -- Thanks, Ewen