Kafka producer also allows you to chose a custom partitioning strategy. You
can use the same what you used in the consumer.

On Tue, Aug 18, 2015 at 12:19 AM, Alec Lee <[email protected]> wrote:

> Hello, thanks for the reply.
>
> Now I am getting some new issues, see
>
> "c1bdeeb9-0309-4dae-9d6c-56406796528a,febc005f,2013-03-15
> 05:15:00-07:00,60,0.3480,2013-03-26 18:15:21.173000-07:00,7738739"   
> "d2e9128a-2800-4dac-b3bc-e88de2bb6e12,fef00032,2013-03-24
> 09:12:00-07:00,60,1.4280,2013-03-26 14:48:20.673000-07:00,7515366"
> "46208ecf-cace-4afc-a275-fe285f8b2321,febc005f,2013-03-15
> 05:48:00-07:00,60,0.3370,2013-03-26 18:15:21.173000-07:00,7738772"   
> "c401ba9a-d7ee-4be2-87f4-001bcf006e98,fef00031,2013-03-24
> 13:03:00-07:00,60,5.6230,2013-03-26 14:48:20.673000-07:00,7515381"
> "bde98248-1432-4a6f-80a1-d60c237b13cd,febc005f,2013-03-15
> 05:21:00-07:00,60,0.2850,2013-03-26 18:15:21.173000-07:00,7738745"   
> "31acf402-fe84-42e6-bfdf-4ab5cb81d77d,fef00032,2013-03-24
> 08:35:00-07:00,60,1.6810,2013-03-26 14:48:20.673000-07:00,7515287"
> "691d6e6e-fd20-408a-a252-0943804f1f4d,febc005f,2013-03-15
> 05:47:00-07:00,60,0.3970,2013-03-26 18:15:21.173000-07:00,7738771"   
> "5321373e-dabe-4736-827b-7def497e5582,fef00031,2013-03-24
> 12:17:00-07:00,60,5.6310,2013-03-26 14:48:20.673000-07:00,7515293"
> "08daa911-ac2f-45d5-b712-dee9b0566fc5,febc005f,2013-03-15
> 05:16:00-07:00,60,0.4660,2013-03-26 18:15:21.173000-07:00,7738740"   
> "df02fa18-090a-4b61-8572-d121970be5bf,fef00031,2013-03-24
> 12:22:00-07:00,60,5.6330,2013-03-26 14:48:20.673000-07:00,7515298"
> "6a06962f-2488-4f02-865a-d7aa7f4d134b,febc005f,2013-03-15
> 05:30:00-07:00,60,0.1870,2013-03-26 18:15:21.173000-07:00,7738754"   
> "9b762264-67c9-4f0c-86b8-554042ff2139,fef00031,2013-03-24
> 12:33:00-07:00,60,5.6340,2013-03-26 14:48:20.673000-07:00,7515321"
> "829c6fad-5b75-4c62-ad78-db3da652df26,febc005f,2013-03-15
> 05:20:00-07:00,60,0.2580,2013-03-26 18:15:21.173000-07:00,7738744"   
> "1daf5bb7-c0b0-453d-8735-abe83383db18,fef00032,2013-03-24
> 08:43:00-07:00,60,1.6730,2013-03-26 14:48:20.673000-07:00,7515313"
> "46bb6e02-78bd-42ad-92c2-5d9177e2c71d,febc005f,2013-03-15
> 05:17:00-07:00,60,0.4390,2013-03-26 18:15:21.173000-07:00,7738741"   
> "12a625f9-10c7-4ca1-99ec-6a6224902f75,fef00031,2013-03-24
> 13:54:00-07:00,60,5.6480,2013-03-26 14:48:21.891000-07:00,7515480"
> "05546fe5-24ed-4d72-8704-264bdece35aa,febc005f,2013-03-15
> 05:19:00-07:00,60,0.2550,2013-03-26 18:15:21.173000-07:00,7738743"   
> "bbed20ad-55b8-4ece-8749-d6ddb17c06f3,fef00031,2013-03-24
> 13:30:00-07:00,60,5.6390,2013-03-26 14:48:21.891000-07:00,7515432"
> "d64a6113-1995-4c27-96c1-ecaebbb9f0e4,febc005f,2013-03-15
> 05:18:00-07:00,60,0.2850,2013-03-26 18:15:21.173000-07:00,7738742"   
> "2c83dc0e-f7bb-410a-83a7-2184a1cc7b16,fef00031,2013-03-24
> 13:32:00-07:00,60,5.6380,2013-03-26 14:48:21.891000-07:00,7515440"
> "9061e687-14b4-4346-8c32-ecb9193123b6,febc005f,2013-03-15
> 05:29:00-07:00,60,0.1980,2013-03-26 18:15:21.173000-07:00,7738753"   
> "b243ac92-a2b2-4d05-8f82-b9d3bc9a9928,fef00032,2013-03-24
> 09:50:00-07:00,60,0.6270,2013-03-26 14:48:21.891000-07:00,7515446"
> "cf12f052-9ef3-45bc-ad45-ff4dc13a46cc,febc005f,2013-03-15
> 05:24:00-07:00,60,0.2350,2013-03-26 18:15:21.173000-07:00,7738748"   
> "addab4e2-afa0-4e92-b2fa-c1c1f90a734c,fef00031,2013-03-24
> 13:41:00-07:00,60,5.6360,2013-03-26 14:48:21.891000-07:00,7515455"
> "f99610c8-078e-41d3-9948-cf761b287d5d,febc005f,2013-03-15
> 05:23:00-07:00,60,0.3150,2013-03-26 18:15:21.173000-07:00,7738747"   
> "ecf9f8e6-1d98-4ac0-8516-7a972f7397e4,fef00032,2013-03-24
> 10:00:00-07:00,60,0.1310,2013-03-26 14:48:21.891000-07:00,7515462"
> "807ef56e-8ddb-4233-a906-ee59514bb580,febc005f,2013-03-15
> 05:22:00-07:00,60,0.3800,2013-03-26 18:15:21.173000-07:00,7738746"   
> "cb13705c-9f85-4d21-94c7-70385ccfdd57,fef00032,2013-03-24
> 10:01:00-07:00,60,0.1310,2013-03-26 14:48:21.891000-07:00,7515469"
> "d335e4b3-1821-41c4-b7d9-baaa469c5a8e,febc005f,2013-03-15
> 05:28:00-07:00,60,0.2100,2013-03-26 18:15:21.173000-07:00,7738752"   
> "757bb70b-414c-4a8a-b515-f3016dc496d0,fef00031,2013-03-24
> 13:25:00-07:00,60,5.6370,2013-03-26 14:48:21.891000-07:00,7515427"
> "38641aba-5f2e-43d9-8f68-a3a66f6e0762,febc005f,2013-03-15
> 05:25:00-07:00,60,0.2000,2013-03-26 18:15:21.173000-07:00,7738749"   
> "21398d00-01fc-4eae-87cb-fd1c84adb46f,fef00032,2013-03-24
> 09:19:00-07:00,60,1.4540,2013-03-26 14:48:21.891000-07:00,7515385"
> "ce52ee1f-db8c-4b51-9417-3f3121ef9544,febc005f,2013-03-15
> 05:27:00-07:00,60,0.2190,2013-03-26 18:15:21.173000-07:00,7738751"   
> "4782d60c-41e0-40dd-a8cf-476d191e164a,fef00031,2013-03-24
> 13:07:00-07:00,60,5.6320,2013-03-26 14:48:21.891000-07:00,7515391"
> "35b1a6e7-a9ea-4422-82bf-02cf60069a44,febc005f,2013-03-15
> 05:26:00-07:00,60,0.2410,2013-03-26 18:15:21.173000-07:00,7738750"   
> "cc5404a6-bcdd-48e9-addc-2cf9d8692573,fef00032,2013-03-24
> 09:33:00-07:00,60,1.4570,2013-03-26 14:48:21.891000-07:00,7515411"
> "89d639c7-555b-4f73-b24c-c624e77ed79c,febc005f,2013-03-15
> 05:32:00-07:00,60,0.2070,2013-03-26 18:15:21.173000-07:00,7738756"   
> "ec581e25-a4f2-4bf8-b31f-e6ac8872e199,fef00031,2013-03-24
> 13:18:00-07:00,60,5.6290,2013-03-26 14:48:21.891000-07:00,7515408"
> "c69ca077-19a0-4c62-be30-1cc8894a3bc0,febc005f,2013-03-15
> 05:46:00-07:00,60,0.2700,2013-03-26 18:15:21.173000-07:00,7738770"   
> "d51b4d83-8b9e-43bc-aab3-4696c03fbb94,fef00031,2013-03-24
> 13:19:00-07:00,60,5.6310,2013-03-26 14:48:21.891000-07:00,7515415"
> "c206ff8a-d1e2-4d69-b368-a1e19ed19692,febc005f,2013-03-15
> 05:37:00-07:00,60,1.2430,2013-03-26 18:15:21.173000-07:00,7738761"   
> "9d7d22bd-4d86-40cd-98bf-63cafd9a8ee4,fef00032,2013-03-24
> 10:33:00-07:00,60,0.1290,2013-03-26 14:48:23.048000-07:00,7515531"
> "59595f0c-36fc-437d-99ef-9ad7ca0d782a,febc005f,2013-03-15
> 05:31:00-07:00,60,0.2740,2013-03-26 18:15:21.173000-07:00,7738755"   
> "1ec6cb30-8b0e-42a6-8950-a19cfe30413d,fef00031,2013-03-24
> 14:19:00-07:00,60,8.1590,2013-03-26 14:48:23.048000-07:00,7515535"
> "d78f90b1-d554-4635-8466-2cffae972d80,febc005f,2013-03-15
> 05:36:00-07:00,60,1.2920,2013-03-26 18:15:21.173000-07:00,7738760"   
> "ce39ac81-616f-48b9-8308-a2133fa29d8d,fef00031,2013-03-24
> 14:26:00-07:00,60,8.1590,2013-03-26 14:48:23.048000-07:00,7515548"
> "cef08ac7-5038-4ced-8577-298e66ce0c73,febc005f,2013-03-15
> 05:33:00-07:00,60,0.2260,2013-03-26 18:15:21.173000-07:00,7738757"   
> "625be8f8-ae97-4956-81c6-6128716b1114,fef00032,2013-03-24
> 10:49:00-07:00,60,0.1320,2013-03-26 14:48:23.048000-07:00,7515565"
> "acce41e0-f9a7-4c2c-9bb6-ceec28c35ea6,febc005f,2013-03-15
> 05:34:00-07:00,60,0.6520,2013-03-26 18:15:21.173000-07:00,7738758"   
> "c608ae2f-e029-4a64-a88e-9478272841a2,fef00031,2013-03-24
> 14:32:00-07:00,60,5.6520,2013-03-26 14:48:23.048000-07:00,7515560"
> "eaea1077-4e26-4aad-b598-0a456c9b9fea,febc005f,2013-03-15
> 05:35:00-07:00,60,1.076,2013-03-26 18:15:21.173000-07:00,7738759"    
> "e00bb52e-05c0-46a0-896a-1aca1e6a00d7,fef00031,2013-03-24
> 14:37:00-07:00,60,8.1500,2013-03-26 14:48:23.048000-07:00,7515571"
> "6bb60b5e-c5cc-4b72-80a0-71b67cc8914a,febc005f,2013-03-15
> 05:45:00-07:00,60,0.2870,2013-03-26 18:15:21.173000-07:00,7738769"   
> "d8982028-653b-41e4-aede-fcec7339237f,fef00032,2013-03-24
> 10:57:00-07:00,60,0.1300,2013-03-26 14:48:23.048000-07:00,7515579"
> "84ba4344-7172-4c4c-bf29-f3eed8f891cb,febc005f,2013-03-15
> 05:40:00-07:00,60,0.2760,2013-03-26 18:15:21.173000-07:00,7738764"   
> "0d352e88-4956-48bd-93fb-221c5f474d2c,fef00031,2013-03-24
> 14:14:00-07:00,60,5.6490,2013-03-26 14:48:23.048000-07:00,7515524"
> "554ee0d7-6275-4c61-b6eb-66061000e164,febc005f,2013-03-15
> 05:39:00-07:00,60,0.2740,2013-03-26 18:15:21.173000-07:00,7738763"   
> "808a626d-a811-4f87-9d3e-59fb09a6d6eb,fef00032,2013-03-24
> 10:15:00-07:00,60,1.061,2013-03-26 14:48:23.048000-07:00,7515495"
> "77fee844-23db-46f8-ada4-c60dd8d2c5e5,febc005f,2013-03-15
> 05:38:00-07:00,60,0.5210,2013-03-26 18:15:21.173000-07:00,7738762"   
> "4004f52e-8abb-4ebe-be21-759e538a9493,fef00032,2013-03-24
> 10:18:00-07:00,60,1.054,2013-03-26 14:48:23.048000-07:00,7515498"
> "eb9774dc-f6f1-44ab-9bf3-ea75c22ad6c8,febc005f,2013-03-15
> 05:44:00-07:00,60,0.2960,2013-03-26 18:15:21.173000-07:00,7738768"   
> "a57b111a-50ee-4d9d-82f7-2aea237321a6,fef00031,2013-03-24
> 14:09:00-07:00,60,5.6460,2013-03-26 14:48:23.048000-07:00,7515513"
> "97126051-95a4-403c-97d0-870c5a2652c1,febc005f,2013-03-15
> 05:41:00-07:00,60,0.2760,2013-03-26 18:15:21.173000-07:00,7738765"   
> "4b7745d1-7f21-4107-8239-95590191e6da,fef00031,2013-03-24
> 14:12:00-07:00,60,5.6530,2013-03-26 14:48:23.048000-07:00,7515516"
> "ec014634-4bb9-488d-a77f-ce58280c5713,febc005f,2013-03-15
> 05:43:00-07:00,60,0.3850,2013-03-26 18:15:21.173000-07:00,7738767"   
> "b9639b7d-dccc-4921-8b03-170ddfff9d0e,fef00032,2013-03-24
> 11:21:00-07:00,60,1.051,2013-03-26 14:48:24.266000-07:00,7515627"
> "97400c10-116d-4010-86df-8a19f6fcc8d4,febc005f,2013-03-15
> 05:42:00-07:00,60,0.3860,2013-03-26 18:15:21.173000-07:00,7738766"   
> "0fefcbc9-84e1-455b-a6de-5455e2c59a6b,fef00032,2013-03-24
> 11:22:00-07:00,60,1.042,2013-03-26 14:48:24.266000-07:00,7515628"
> "aee821ee-ac83-40f9-ba14-7b25f80ddcec,febc005f,2013-03-15
> 06:03:00-07:00,60,0.4510,2013-03-26 18:15:21.173000-07:00,7738787"   
> "9ed42e1c-5028-4041-96b4-6019f783d97d,fef00032,2013-03-24
> 11:04:00-07:00,60,1.3000,2013-03-26 14:48:24.266000-07:00,7515592"
> "43713310-f721-4091-adf6-83ce24a6f0b6,febc005f,2013-03-15
> 06:05:00-07:00,60,0.3700,2013-03-26 18:15:21.173000-07:00,7738789"   
> "1e0e9be5-2536-43c7-af93-55f41c8fadc7,fef00031,2013-03-24
> 14:59:00-07:00,60,5.6460,2013-03-26 14:48:24.266000-07:00,7515611"
> "743fe996-5bec-477b-9847-021d303e1e56,febc005f,2013-03-15
> 05:55:00-07:00,60,0.3460,2013-03-26 18:15:21.173000-07:00,7738779"   
> "b614fd1d-b36a-4501-a633-0e438d51afff,fef00032,2013-03-24
> 11:09:00-07:00,60,1.091,2013-03-26 14:48:24.266000-07:00,7515603"
> "a9f12f45-6fda-4768-844f-3fd3950d1047,febc005f,2013-03-15
> 06:04:00-07:00,60,0.4100,2013-03-26 18:15:21.173000-07:00,7738788"   
> "14f49e9c-b551-4244-8c8e-4468408fc15c,fef00031,2013-03-24
> 15:12:00-07:00,60,5.6450,2013-03-26 14:48:24.266000-07:00,7515636"
> "f6048b84-6d6f-4451-a4f8-3a47b62b3ca2,febc005f,2013-03-15
> 05:54:00-07:00,60,0.3130,2013-03-26 18:15:21.173000-07:00,7738778"   
> "4049b4e7-ce1b-4900-9725-903b62525f0c,fef00031,2013-03-24
> 15:02:00-07:00,60,5.6490,2013-03-26 14:48:24.266000-07:0
>
> the left side is the data I get from DB, I retrieve the data from DB and
> push into Kafka, and storm consume the data from there, apparently kafka
> nodes shuffle the data because I have multiple partitions, therefore I
> assume I will have to grouping (by senors) + ordering (by timestamps) in
> order to be able to find the missing minutes. What is the efficient way to
> order the data from kafka?
>
> thanks
>
> AL
>
> On Aug 13, 2015, at 3:25 PM, Alec Lee <[email protected]> wrote:
>
>
>
> Begin forwarded message:
>
> *From: *Abhishek Agarwal <[email protected]>
> *Subject: **Re: how to use storm to identify the missing records in data
> stream*
> *Date: *August 11, 2015 at 10:00:29 PM PDT
> *To: *[email protected]
> *Reply-To: *[email protected]
>
> You can implement a custom field grouping such that there are total 60
> bolts. Each bolt with index X receives the X second data in the minute. Now
> each bolt has to ensure that they report any missing minutes. If they are
> not out of order, there is not much state needed in the memory. You just
> forward the missing minutes (along with the second that bolt is responsible
> for) to another stream. If they can be out of order, multiple strategies
> are there with different trade-offs.
>
> On Wed, Aug 12, 2015 at 7:15 AM, Kishore Senji <[email protected]> wrote:
>
>> You can have the bolt maintain a sliding window of the events (last two
>> events including the current one). If the current event is immediately
>> after the previous event in the queue, the current event will be the head
>> in the window. Otherwise, you can compute the missing events and emits
>> them, before acking the current event and making it the head in the window.
>> For this to work, all events for a given device has to go to the same bolt
>> instance, so for scaling the bolt you can partition the stream on
>> sensor/devices (or some other dimension) and use the appropriate fields
>> grouping. Also another assumption is that events do not come out of order
>> and as they are emitting every 1 minute (and not really concurrently), they
>> should not be out of order from a given device, but if they do, then you
>> can hold more events in the window and sort them). Also you would have to
>> think about fault tolerance as in this approach we are keeping state
>> in-memory and it will get lost if a node dies.
>>
>>
>>
>> On Tue, Aug 11, 2015 at 5:20 PM, Alec Lee <[email protected]> wrote:
>>
>>> the timestamp looks like:
>>> 2013-03-21 11:43:00-07
>>> 2013-03-21 11:44:00-07
>>> 2013-03-21 11:45:00-07
>>>
>>>
>>>
>>> On Aug 11, 2015, at 5:05 PM, Alec Lee <[email protected]> wrote:
>>>
>>> Thanks, Javier, here is the data records I am receiving:
>>>  id  | sensor_id |  timestamp  |  period | current | date_received |
>>>
>>> so basically what I understand,  each tuple emitted, including all
>>> fields above, but some records are missing in terms of sequential
>>> timestamp, for example, I should receive the records every minute
>>> 2015-08-11T17:01:49
>>> 2015-08-11T17:01:50
>>> 2015-08-11T17:01:51
>>> 2015-08-11T17:01:52
>>> .
>>> .
>>> .
>>>
>>> however, I may get such type of data,
>>> 2015-08-11T17:01:49
>>> 2015-08-11T17:01:50
>>> 2015-08-11T17:01:53
>>>
>>> I must find the missing records corresponding to 2 timestamps between 50
>>> and 53, and I will estimate the miss current value by average the 01:49 and
>>> 01:53 current values.
>>>
>>> I am not sure if I explain clearly, thanks
>>>
>>> AL
>>>
>>> On Aug 11, 2015, at 3:35 PM, Javier Gonzalez <[email protected]> wrote:
>>>
>>> Just to make sure I'm understanding correctly: Do you have a single
>>> stream of sequential ids or multiple streams that need to be interpolated?
>>> Do you receive a stream of ids and emit a stream of timestamped ids?
>>> On Aug 11, 2015 5:34 PM, "Alec Lee" <[email protected]> wrote:
>>>
>>>> Hello, all
>>>>
>>>> Here I have a question about storm doing analytics, I have a data
>>>> stream coming in in real-time, each record associates a timestamp, it
>>>> supposes to be ingested every 1 second from devices, but we know some
>>>> records are missing, say, timestamp1, timestamp2, timestamp5, here
>>>> timestamp3 and 4 records are missing. How can I identify these missing
>>>> records, what I need to find out what records are missed base on the
>>>> sequential timestamp, and estimate the missing values in terms of last
>>>> record, and next record, i can make the average as this missing value. And
>>>> output of this bolt will be a consecutive of data with no missing records.
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>> Al
>>>
>>>
>>>
>>>
>>
>
>
> --
> Regards,
> Abhishek Agarwal
>
>
>
>


-- 
Regards,
Abhishek Agarwal

Reply via email to