Kafka producer also allows you to chose a custom partitioning strategy. You can use the same what you used in the consumer.
On Tue, Aug 18, 2015 at 12:19 AM, Alec Lee <[email protected]> wrote: > Hello, thanks for the reply. > > Now I am getting some new issues, see > > "c1bdeeb9-0309-4dae-9d6c-56406796528a,febc005f,2013-03-15 > 05:15:00-07:00,60,0.3480,2013-03-26 18:15:21.173000-07:00,7738739" > "d2e9128a-2800-4dac-b3bc-e88de2bb6e12,fef00032,2013-03-24 > 09:12:00-07:00,60,1.4280,2013-03-26 14:48:20.673000-07:00,7515366" > "46208ecf-cace-4afc-a275-fe285f8b2321,febc005f,2013-03-15 > 05:48:00-07:00,60,0.3370,2013-03-26 18:15:21.173000-07:00,7738772" > "c401ba9a-d7ee-4be2-87f4-001bcf006e98,fef00031,2013-03-24 > 13:03:00-07:00,60,5.6230,2013-03-26 14:48:20.673000-07:00,7515381" > "bde98248-1432-4a6f-80a1-d60c237b13cd,febc005f,2013-03-15 > 05:21:00-07:00,60,0.2850,2013-03-26 18:15:21.173000-07:00,7738745" > "31acf402-fe84-42e6-bfdf-4ab5cb81d77d,fef00032,2013-03-24 > 08:35:00-07:00,60,1.6810,2013-03-26 14:48:20.673000-07:00,7515287" > "691d6e6e-fd20-408a-a252-0943804f1f4d,febc005f,2013-03-15 > 05:47:00-07:00,60,0.3970,2013-03-26 18:15:21.173000-07:00,7738771" > "5321373e-dabe-4736-827b-7def497e5582,fef00031,2013-03-24 > 12:17:00-07:00,60,5.6310,2013-03-26 14:48:20.673000-07:00,7515293" > "08daa911-ac2f-45d5-b712-dee9b0566fc5,febc005f,2013-03-15 > 05:16:00-07:00,60,0.4660,2013-03-26 18:15:21.173000-07:00,7738740" > "df02fa18-090a-4b61-8572-d121970be5bf,fef00031,2013-03-24 > 12:22:00-07:00,60,5.6330,2013-03-26 14:48:20.673000-07:00,7515298" > "6a06962f-2488-4f02-865a-d7aa7f4d134b,febc005f,2013-03-15 > 05:30:00-07:00,60,0.1870,2013-03-26 18:15:21.173000-07:00,7738754" > "9b762264-67c9-4f0c-86b8-554042ff2139,fef00031,2013-03-24 > 12:33:00-07:00,60,5.6340,2013-03-26 14:48:20.673000-07:00,7515321" > "829c6fad-5b75-4c62-ad78-db3da652df26,febc005f,2013-03-15 > 05:20:00-07:00,60,0.2580,2013-03-26 18:15:21.173000-07:00,7738744" > "1daf5bb7-c0b0-453d-8735-abe83383db18,fef00032,2013-03-24 > 08:43:00-07:00,60,1.6730,2013-03-26 14:48:20.673000-07:00,7515313" > "46bb6e02-78bd-42ad-92c2-5d9177e2c71d,febc005f,2013-03-15 > 05:17:00-07:00,60,0.4390,2013-03-26 18:15:21.173000-07:00,7738741" > "12a625f9-10c7-4ca1-99ec-6a6224902f75,fef00031,2013-03-24 > 13:54:00-07:00,60,5.6480,2013-03-26 14:48:21.891000-07:00,7515480" > "05546fe5-24ed-4d72-8704-264bdece35aa,febc005f,2013-03-15 > 05:19:00-07:00,60,0.2550,2013-03-26 18:15:21.173000-07:00,7738743" > "bbed20ad-55b8-4ece-8749-d6ddb17c06f3,fef00031,2013-03-24 > 13:30:00-07:00,60,5.6390,2013-03-26 14:48:21.891000-07:00,7515432" > "d64a6113-1995-4c27-96c1-ecaebbb9f0e4,febc005f,2013-03-15 > 05:18:00-07:00,60,0.2850,2013-03-26 18:15:21.173000-07:00,7738742" > "2c83dc0e-f7bb-410a-83a7-2184a1cc7b16,fef00031,2013-03-24 > 13:32:00-07:00,60,5.6380,2013-03-26 14:48:21.891000-07:00,7515440" > "9061e687-14b4-4346-8c32-ecb9193123b6,febc005f,2013-03-15 > 05:29:00-07:00,60,0.1980,2013-03-26 18:15:21.173000-07:00,7738753" > "b243ac92-a2b2-4d05-8f82-b9d3bc9a9928,fef00032,2013-03-24 > 09:50:00-07:00,60,0.6270,2013-03-26 14:48:21.891000-07:00,7515446" > "cf12f052-9ef3-45bc-ad45-ff4dc13a46cc,febc005f,2013-03-15 > 05:24:00-07:00,60,0.2350,2013-03-26 18:15:21.173000-07:00,7738748" > "addab4e2-afa0-4e92-b2fa-c1c1f90a734c,fef00031,2013-03-24 > 13:41:00-07:00,60,5.6360,2013-03-26 14:48:21.891000-07:00,7515455" > "f99610c8-078e-41d3-9948-cf761b287d5d,febc005f,2013-03-15 > 05:23:00-07:00,60,0.3150,2013-03-26 18:15:21.173000-07:00,7738747" > "ecf9f8e6-1d98-4ac0-8516-7a972f7397e4,fef00032,2013-03-24 > 10:00:00-07:00,60,0.1310,2013-03-26 14:48:21.891000-07:00,7515462" > "807ef56e-8ddb-4233-a906-ee59514bb580,febc005f,2013-03-15 > 05:22:00-07:00,60,0.3800,2013-03-26 18:15:21.173000-07:00,7738746" > "cb13705c-9f85-4d21-94c7-70385ccfdd57,fef00032,2013-03-24 > 10:01:00-07:00,60,0.1310,2013-03-26 14:48:21.891000-07:00,7515469" > "d335e4b3-1821-41c4-b7d9-baaa469c5a8e,febc005f,2013-03-15 > 05:28:00-07:00,60,0.2100,2013-03-26 18:15:21.173000-07:00,7738752" > "757bb70b-414c-4a8a-b515-f3016dc496d0,fef00031,2013-03-24 > 13:25:00-07:00,60,5.6370,2013-03-26 14:48:21.891000-07:00,7515427" > "38641aba-5f2e-43d9-8f68-a3a66f6e0762,febc005f,2013-03-15 > 05:25:00-07:00,60,0.2000,2013-03-26 18:15:21.173000-07:00,7738749" > "21398d00-01fc-4eae-87cb-fd1c84adb46f,fef00032,2013-03-24 > 09:19:00-07:00,60,1.4540,2013-03-26 14:48:21.891000-07:00,7515385" > "ce52ee1f-db8c-4b51-9417-3f3121ef9544,febc005f,2013-03-15 > 05:27:00-07:00,60,0.2190,2013-03-26 18:15:21.173000-07:00,7738751" > "4782d60c-41e0-40dd-a8cf-476d191e164a,fef00031,2013-03-24 > 13:07:00-07:00,60,5.6320,2013-03-26 14:48:21.891000-07:00,7515391" > "35b1a6e7-a9ea-4422-82bf-02cf60069a44,febc005f,2013-03-15 > 05:26:00-07:00,60,0.2410,2013-03-26 18:15:21.173000-07:00,7738750" > "cc5404a6-bcdd-48e9-addc-2cf9d8692573,fef00032,2013-03-24 > 09:33:00-07:00,60,1.4570,2013-03-26 14:48:21.891000-07:00,7515411" > "89d639c7-555b-4f73-b24c-c624e77ed79c,febc005f,2013-03-15 > 05:32:00-07:00,60,0.2070,2013-03-26 18:15:21.173000-07:00,7738756" > "ec581e25-a4f2-4bf8-b31f-e6ac8872e199,fef00031,2013-03-24 > 13:18:00-07:00,60,5.6290,2013-03-26 14:48:21.891000-07:00,7515408" > "c69ca077-19a0-4c62-be30-1cc8894a3bc0,febc005f,2013-03-15 > 05:46:00-07:00,60,0.2700,2013-03-26 18:15:21.173000-07:00,7738770" > "d51b4d83-8b9e-43bc-aab3-4696c03fbb94,fef00031,2013-03-24 > 13:19:00-07:00,60,5.6310,2013-03-26 14:48:21.891000-07:00,7515415" > "c206ff8a-d1e2-4d69-b368-a1e19ed19692,febc005f,2013-03-15 > 05:37:00-07:00,60,1.2430,2013-03-26 18:15:21.173000-07:00,7738761" > "9d7d22bd-4d86-40cd-98bf-63cafd9a8ee4,fef00032,2013-03-24 > 10:33:00-07:00,60,0.1290,2013-03-26 14:48:23.048000-07:00,7515531" > "59595f0c-36fc-437d-99ef-9ad7ca0d782a,febc005f,2013-03-15 > 05:31:00-07:00,60,0.2740,2013-03-26 18:15:21.173000-07:00,7738755" > "1ec6cb30-8b0e-42a6-8950-a19cfe30413d,fef00031,2013-03-24 > 14:19:00-07:00,60,8.1590,2013-03-26 14:48:23.048000-07:00,7515535" > "d78f90b1-d554-4635-8466-2cffae972d80,febc005f,2013-03-15 > 05:36:00-07:00,60,1.2920,2013-03-26 18:15:21.173000-07:00,7738760" > "ce39ac81-616f-48b9-8308-a2133fa29d8d,fef00031,2013-03-24 > 14:26:00-07:00,60,8.1590,2013-03-26 14:48:23.048000-07:00,7515548" > "cef08ac7-5038-4ced-8577-298e66ce0c73,febc005f,2013-03-15 > 05:33:00-07:00,60,0.2260,2013-03-26 18:15:21.173000-07:00,7738757" > "625be8f8-ae97-4956-81c6-6128716b1114,fef00032,2013-03-24 > 10:49:00-07:00,60,0.1320,2013-03-26 14:48:23.048000-07:00,7515565" > "acce41e0-f9a7-4c2c-9bb6-ceec28c35ea6,febc005f,2013-03-15 > 05:34:00-07:00,60,0.6520,2013-03-26 18:15:21.173000-07:00,7738758" > "c608ae2f-e029-4a64-a88e-9478272841a2,fef00031,2013-03-24 > 14:32:00-07:00,60,5.6520,2013-03-26 14:48:23.048000-07:00,7515560" > "eaea1077-4e26-4aad-b598-0a456c9b9fea,febc005f,2013-03-15 > 05:35:00-07:00,60,1.076,2013-03-26 18:15:21.173000-07:00,7738759" > "e00bb52e-05c0-46a0-896a-1aca1e6a00d7,fef00031,2013-03-24 > 14:37:00-07:00,60,8.1500,2013-03-26 14:48:23.048000-07:00,7515571" > "6bb60b5e-c5cc-4b72-80a0-71b67cc8914a,febc005f,2013-03-15 > 05:45:00-07:00,60,0.2870,2013-03-26 18:15:21.173000-07:00,7738769" > "d8982028-653b-41e4-aede-fcec7339237f,fef00032,2013-03-24 > 10:57:00-07:00,60,0.1300,2013-03-26 14:48:23.048000-07:00,7515579" > "84ba4344-7172-4c4c-bf29-f3eed8f891cb,febc005f,2013-03-15 > 05:40:00-07:00,60,0.2760,2013-03-26 18:15:21.173000-07:00,7738764" > "0d352e88-4956-48bd-93fb-221c5f474d2c,fef00031,2013-03-24 > 14:14:00-07:00,60,5.6490,2013-03-26 14:48:23.048000-07:00,7515524" > "554ee0d7-6275-4c61-b6eb-66061000e164,febc005f,2013-03-15 > 05:39:00-07:00,60,0.2740,2013-03-26 18:15:21.173000-07:00,7738763" > "808a626d-a811-4f87-9d3e-59fb09a6d6eb,fef00032,2013-03-24 > 10:15:00-07:00,60,1.061,2013-03-26 14:48:23.048000-07:00,7515495" > "77fee844-23db-46f8-ada4-c60dd8d2c5e5,febc005f,2013-03-15 > 05:38:00-07:00,60,0.5210,2013-03-26 18:15:21.173000-07:00,7738762" > "4004f52e-8abb-4ebe-be21-759e538a9493,fef00032,2013-03-24 > 10:18:00-07:00,60,1.054,2013-03-26 14:48:23.048000-07:00,7515498" > "eb9774dc-f6f1-44ab-9bf3-ea75c22ad6c8,febc005f,2013-03-15 > 05:44:00-07:00,60,0.2960,2013-03-26 18:15:21.173000-07:00,7738768" > "a57b111a-50ee-4d9d-82f7-2aea237321a6,fef00031,2013-03-24 > 14:09:00-07:00,60,5.6460,2013-03-26 14:48:23.048000-07:00,7515513" > "97126051-95a4-403c-97d0-870c5a2652c1,febc005f,2013-03-15 > 05:41:00-07:00,60,0.2760,2013-03-26 18:15:21.173000-07:00,7738765" > "4b7745d1-7f21-4107-8239-95590191e6da,fef00031,2013-03-24 > 14:12:00-07:00,60,5.6530,2013-03-26 14:48:23.048000-07:00,7515516" > "ec014634-4bb9-488d-a77f-ce58280c5713,febc005f,2013-03-15 > 05:43:00-07:00,60,0.3850,2013-03-26 18:15:21.173000-07:00,7738767" > "b9639b7d-dccc-4921-8b03-170ddfff9d0e,fef00032,2013-03-24 > 11:21:00-07:00,60,1.051,2013-03-26 14:48:24.266000-07:00,7515627" > "97400c10-116d-4010-86df-8a19f6fcc8d4,febc005f,2013-03-15 > 05:42:00-07:00,60,0.3860,2013-03-26 18:15:21.173000-07:00,7738766" > "0fefcbc9-84e1-455b-a6de-5455e2c59a6b,fef00032,2013-03-24 > 11:22:00-07:00,60,1.042,2013-03-26 14:48:24.266000-07:00,7515628" > "aee821ee-ac83-40f9-ba14-7b25f80ddcec,febc005f,2013-03-15 > 06:03:00-07:00,60,0.4510,2013-03-26 18:15:21.173000-07:00,7738787" > "9ed42e1c-5028-4041-96b4-6019f783d97d,fef00032,2013-03-24 > 11:04:00-07:00,60,1.3000,2013-03-26 14:48:24.266000-07:00,7515592" > "43713310-f721-4091-adf6-83ce24a6f0b6,febc005f,2013-03-15 > 06:05:00-07:00,60,0.3700,2013-03-26 18:15:21.173000-07:00,7738789" > "1e0e9be5-2536-43c7-af93-55f41c8fadc7,fef00031,2013-03-24 > 14:59:00-07:00,60,5.6460,2013-03-26 14:48:24.266000-07:00,7515611" > "743fe996-5bec-477b-9847-021d303e1e56,febc005f,2013-03-15 > 05:55:00-07:00,60,0.3460,2013-03-26 18:15:21.173000-07:00,7738779" > "b614fd1d-b36a-4501-a633-0e438d51afff,fef00032,2013-03-24 > 11:09:00-07:00,60,1.091,2013-03-26 14:48:24.266000-07:00,7515603" > "a9f12f45-6fda-4768-844f-3fd3950d1047,febc005f,2013-03-15 > 06:04:00-07:00,60,0.4100,2013-03-26 18:15:21.173000-07:00,7738788" > "14f49e9c-b551-4244-8c8e-4468408fc15c,fef00031,2013-03-24 > 15:12:00-07:00,60,5.6450,2013-03-26 14:48:24.266000-07:00,7515636" > "f6048b84-6d6f-4451-a4f8-3a47b62b3ca2,febc005f,2013-03-15 > 05:54:00-07:00,60,0.3130,2013-03-26 18:15:21.173000-07:00,7738778" > "4049b4e7-ce1b-4900-9725-903b62525f0c,fef00031,2013-03-24 > 15:02:00-07:00,60,5.6490,2013-03-26 14:48:24.266000-07:0 > > the left side is the data I get from DB, I retrieve the data from DB and > push into Kafka, and storm consume the data from there, apparently kafka > nodes shuffle the data because I have multiple partitions, therefore I > assume I will have to grouping (by senors) + ordering (by timestamps) in > order to be able to find the missing minutes. What is the efficient way to > order the data from kafka? > > thanks > > AL > > On Aug 13, 2015, at 3:25 PM, Alec Lee <[email protected]> wrote: > > > > Begin forwarded message: > > *From: *Abhishek Agarwal <[email protected]> > *Subject: **Re: how to use storm to identify the missing records in data > stream* > *Date: *August 11, 2015 at 10:00:29 PM PDT > *To: *[email protected] > *Reply-To: *[email protected] > > You can implement a custom field grouping such that there are total 60 > bolts. Each bolt with index X receives the X second data in the minute. Now > each bolt has to ensure that they report any missing minutes. If they are > not out of order, there is not much state needed in the memory. You just > forward the missing minutes (along with the second that bolt is responsible > for) to another stream. If they can be out of order, multiple strategies > are there with different trade-offs. > > On Wed, Aug 12, 2015 at 7:15 AM, Kishore Senji <[email protected]> wrote: > >> You can have the bolt maintain a sliding window of the events (last two >> events including the current one). If the current event is immediately >> after the previous event in the queue, the current event will be the head >> in the window. Otherwise, you can compute the missing events and emits >> them, before acking the current event and making it the head in the window. >> For this to work, all events for a given device has to go to the same bolt >> instance, so for scaling the bolt you can partition the stream on >> sensor/devices (or some other dimension) and use the appropriate fields >> grouping. Also another assumption is that events do not come out of order >> and as they are emitting every 1 minute (and not really concurrently), they >> should not be out of order from a given device, but if they do, then you >> can hold more events in the window and sort them). Also you would have to >> think about fault tolerance as in this approach we are keeping state >> in-memory and it will get lost if a node dies. >> >> >> >> On Tue, Aug 11, 2015 at 5:20 PM, Alec Lee <[email protected]> wrote: >> >>> the timestamp looks like: >>> 2013-03-21 11:43:00-07 >>> 2013-03-21 11:44:00-07 >>> 2013-03-21 11:45:00-07 >>> >>> >>> >>> On Aug 11, 2015, at 5:05 PM, Alec Lee <[email protected]> wrote: >>> >>> Thanks, Javier, here is the data records I am receiving: >>> id | sensor_id | timestamp | period | current | date_received | >>> >>> so basically what I understand, each tuple emitted, including all >>> fields above, but some records are missing in terms of sequential >>> timestamp, for example, I should receive the records every minute >>> 2015-08-11T17:01:49 >>> 2015-08-11T17:01:50 >>> 2015-08-11T17:01:51 >>> 2015-08-11T17:01:52 >>> . >>> . >>> . >>> >>> however, I may get such type of data, >>> 2015-08-11T17:01:49 >>> 2015-08-11T17:01:50 >>> 2015-08-11T17:01:53 >>> >>> I must find the missing records corresponding to 2 timestamps between 50 >>> and 53, and I will estimate the miss current value by average the 01:49 and >>> 01:53 current values. >>> >>> I am not sure if I explain clearly, thanks >>> >>> AL >>> >>> On Aug 11, 2015, at 3:35 PM, Javier Gonzalez <[email protected]> wrote: >>> >>> Just to make sure I'm understanding correctly: Do you have a single >>> stream of sequential ids or multiple streams that need to be interpolated? >>> Do you receive a stream of ids and emit a stream of timestamped ids? >>> On Aug 11, 2015 5:34 PM, "Alec Lee" <[email protected]> wrote: >>> >>>> Hello, all >>>> >>>> Here I have a question about storm doing analytics, I have a data >>>> stream coming in in real-time, each record associates a timestamp, it >>>> supposes to be ingested every 1 second from devices, but we know some >>>> records are missing, say, timestamp1, timestamp2, timestamp5, here >>>> timestamp3 and 4 records are missing. How can I identify these missing >>>> records, what I need to find out what records are missed base on the >>>> sequential timestamp, and estimate the missing values in terms of last >>>> record, and next record, i can make the average as this missing value. And >>>> output of this bolt will be a consecutive of data with no missing records. >>>> >>>> >>>> Thanks >>>> >>>> >>>> Al >>> >>> >>> >>> >> > > > -- > Regards, > Abhishek Agarwal > > > > -- Regards, Abhishek Agarwal
