Re: Re: Re: Optimize Order By + Limit Query

2017-03-29 Thread Ravindra Pesala
Hi,

It comes up with many limitations
1. It cannot work for dictionary columns. As there is no guarantee that
dictionary allocation is in sorted order.
2. It cannot work for no inverted index columns.
3. It cannot work for measures.

Moreover as you mentioned that it can reduce IO, But I don't think we can
reduce any IO since we need to read all blocklets to do merge sort. And I
am not sure how we can keep all the data in memory until we do merge sort.
I am still believe that this work is belonged to execution engine, not file
format. This type of specific improvements may give good performance in
some specific type of queries but these will give long term complications
in maintainability.


Regards,
Ravindra.

On 30 March 2017 at 08:23, 马云  wrote:

> Hi Ravindran,
>
> yes, use carbon do the sorting if the order by column is not first column.
>
> But its sorting is very high since the dimension data in blocklet is stored 
> after sorting.
>
> So in carbon can use  merge sort  + topN to get N data from each block.
>
> In addition,  the biggest difference is that it can reduce disk IO since can 
> use limit n to reduce required blocklets.
>
> if you only apply spark's top N, I don't think you can make  suck below 
> performance.
>
> That's impossible  if don't reduce disk IO.
>
>
>
>
>
>
>
>
> At 2017-03-30 03:12:54, "Ravindra Pesala"  wrote:
> >Hi,
> >
> >You mean Carbon do the sorting if the order by column is not first column
> >and provide only limit values to spark. But the same job spark is also
> >doing it just sorts the partition and gets the top values out of it. You
> >can reduce the table_blocksize to get the better sort performance as spark
> >try to do sorting inside memory.
> >
> >I can see we can do some optimizations in integration layer itself with out
> >pushing down any logic to carbon like if the order by column is first
> >column then we can just get limit values with out sorting any data.
> >
> >Regards,
> >Ravindra.
> >
> >On 29 March 2017 at 08:58, 马云  wrote:
> >
> >> Hi Ravindran,
> >> Thanks for your quick response. please see my answer as below
> >> 
> >>  What if the order by column is not the first column? It needs to scan all
> >> blocklets to get the data out of it if the order by column is not first
> >> column of mdk
> >> 
> >> Answer :  if step2 doesn't filter any blocklet, you are right,It needs to
> >> scan all blocklets to get the data out of it if the order by column is not
> >> first column of mdk
> >> but it just scan all the order by column's data, for
> >> others columns data,  use the lazy-load strategy and  it can reduce scan
> >> accordingly to  limit value.
> >> Hence you can see the performance is much better now
> >> after  my optimization. Currently the carbondata order by + limit
> >> performance is very bad since it scans all data.
> >>in my test there are  20,000,000 data, it takes more than
> >> 10s, if data is much more huge,  I think it is hard for user to stand such
> >> bad performance when they do order by + limit  query?
> >>
> >>
> >> 
> >>  We used to have multiple push down optimizations from spark to carbon
> >> like aggregation, limit, topn etc. But later it was removed because it is
> >> very hard to maintain for version to version. I feel it is better that
> >> execution engine like spark can do these type of operations.
> >> 
> >> Answer : In my opinion, I don't think "hard to maintain for version to
> >> version" is a good reason to give up the order by  + limit optimization.
> >> I think it can create new class to extends current and try to reduce the
> >> impact for the current code. Maybe can make it is easy to maintain.
> >> Maybe I am wrong.
> >>
> >>
> >>
> >>
> >> At 2017-03-29 02:21:58, "Ravindra Pesala"  wrote:
> >>
> >>
> >> Hi Jarck Ma,
> >>
> >> It is great to try optimizing Carbondata.
> >> I think this solution comes up with many limitations. What if the order by
> >> column is not the first column? It needs to scan all blocklets to get the
> >> data out of it if the order by column is not first column of mdk.
> >>
> >> We used to have multiple push down optimizations from spark to carbon like
> >> aggregation, limit, topn etc. But later it was removed because it is very
> >> hard to maintain for version to version. I feel it is better that execution
> >> engine like spark can do these type of operations.
> >>
> >>
> >> Regards,
> >> Ravindra.
> >>
> >>
> >>
> >> On Tue, Mar 28, 2017, 14:28 马云  wrote:
> >>
> >>
> >> Hi Carbon Dev,
> >>
> >> Currently I have done optimization for ordering by 1 dimension.
> >>
> >> my local performance test as below. Please 

Re:Re: Re: Optimize Order By + Limit Query

2017-03-29 Thread 马云
Hi Ravindran,yes, use carbon do the sorting if the order by column is not first 
column.But its sorting is very high since the dimension data in blocklet is 
stored after sorting.So in carbon can use  merge sort  + topN to get N data 
from each block.In addition,  the biggest difference is that it can reduce disk 
IO since can use limit n to reduce required blocklets.if you only apply spark's 
top N, I don't think you can make  suck below performance. That's impossible  
if don't reduce disk IO.












At 2017-03-30 03:12:54, "Ravindra Pesala"  wrote:
>Hi,
>
>You mean Carbon do the sorting if the order by column is not first column
>and provide only limit values to spark. But the same job spark is also
>doing it just sorts the partition and gets the top values out of it. You
>can reduce the table_blocksize to get the better sort performance as spark
>try to do sorting inside memory.
>
>I can see we can do some optimizations in integration layer itself with out
>pushing down any logic to carbon like if the order by column is first
>column then we can just get limit values with out sorting any data.
>
>Regards,
>Ravindra.
>
>On 29 March 2017 at 08:58, 马云  wrote:
>
>> Hi Ravindran,
>> Thanks for your quick response. please see my answer as below
>> 
>>  What if the order by column is not the first column? It needs to scan all
>> blocklets to get the data out of it if the order by column is not first
>> column of mdk
>> 
>> Answer :  if step2 doesn't filter any blocklet, you are right,It needs to
>> scan all blocklets to get the data out of it if the order by column is not
>> first column of mdk
>> but it just scan all the order by column's data, for
>> others columns data,  use the lazy-load strategy and  it can reduce scan
>> accordingly to  limit value.
>> Hence you can see the performance is much better now
>> after  my optimization. Currently the carbondata order by + limit
>> performance is very bad since it scans all data.
>>in my test there are  20,000,000 data, it takes more than
>> 10s, if data is much more huge,  I think it is hard for user to stand such
>> bad performance when they do order by + limit  query?
>>
>>
>> 
>>  We used to have multiple push down optimizations from spark to carbon
>> like aggregation, limit, topn etc. But later it was removed because it is
>> very hard to maintain for version to version. I feel it is better that
>> execution engine like spark can do these type of operations.
>> 
>> Answer : In my opinion, I don't think "hard to maintain for version to
>> version" is a good reason to give up the order by  + limit optimization.
>> I think it can create new class to extends current and try to reduce the
>> impact for the current code. Maybe can make it is easy to maintain.
>> Maybe I am wrong.
>>
>>
>>
>>
>> At 2017-03-29 02:21:58, "Ravindra Pesala"  wrote:
>>
>>
>> Hi Jarck Ma,
>>
>> It is great to try optimizing Carbondata.
>> I think this solution comes up with many limitations. What if the order by
>> column is not the first column? It needs to scan all blocklets to get the
>> data out of it if the order by column is not first column of mdk.
>>
>> We used to have multiple push down optimizations from spark to carbon like
>> aggregation, limit, topn etc. But later it was removed because it is very
>> hard to maintain for version to version. I feel it is better that execution
>> engine like spark can do these type of operations.
>>
>>
>> Regards,
>> Ravindra.
>>
>>
>>
>> On Tue, Mar 28, 2017, 14:28 马云  wrote:
>>
>>
>> Hi Carbon Dev,
>>
>> Currently I have done optimization for ordering by 1 dimension.
>>
>> my local performance test as below. Please give your suggestion.
>>
>>
>>
>>
>> | data count | test sql | limit value in sql | performance(ms) |
>> | optimized code | original code |
>> | 20,000,000 | SELECT name, serialname, country, salary, id, date FROM t3
>> ORDER BY country limit 1000 | 1000 | 677 | 10906 |
>> | SELECT name, serialname, country, salary, id, date FROM t3 ORDER BY
>> serialname limit 1 | 1 | 1897 | 12108 |
>> | SELECT name, serialname, country, salary, id, date FROM t3 ORDER BY
>> serialname limit 5 | 5 | 2814 | 14279 |
>>
>> my optimization solution for order by 1 dimension + limit as below
>>
>> mainly filter some unnecessary blocklets and leverage  the dimension's
>> order stored feature to get sorted data in each partition.
>>
>> at last use the TakeOrderedAndProject to merge sorted data from partitions
>>
>> step1. change logical plan and push down the order by and limit
>> information to carbon scan
>>
>> and change sort physical plan to TakeOrderedAndProject  

Re: Load data into carbondata executors distributed unevenly

2017-03-29 Thread Ravindra Pesala
Hi,

It seems attachments are missing.Can you attach them again.

Regards,
Ravindra.

On 30 March 2017 at 08:02, a  wrote:

> Hello!
>
> *Test result:*
> When I load csv data into carbondata table 3 times,the executors
> distributed unevenly。My  purpose
> 
>  is
> one node one task,but the result is some node has 2 task and some node has
> no task。
> See the load data 1.png,data 2.png,data 3.png。
> The carbondata data.PNG is the data structure in hadoop.
>
> I load 4   records into carbondata table takes 2629s seconds,its
> too long。
>
> *Question:*
> How can i make the executors distributed evenly ?
>
> The environment:
> spark2.1+carbondata1.1,there are 7 datanodes.
>
> *./bin/spark-shell   \--master yarn \--deploy-mode client
>  \--num-executors n \ (the first time is 7(result in load data 1.png),the
> second time is 6(result in load data 2.png),the three time is 8(result in
> load data3.png))--executor-cores 10 \--executor-memory 40G \--driver-memory
> 8G \*
>
> carbon.properties
>  DataLoading Configuration 
> carbon.sort.file.buffer.size=20
> carbon.graph.rowset.size=1
> carbon.number.of.cores.while.loading=10
> carbon.sort.size=5
> carbon.number.of.cores.while.compacting=10
> carbon.number.of.cores=10
>
> Best regards!
>
>
>
>
>
>
>



-- 
Thanks & Regards,
Ravi


Load data into carbondata executors distributed unevenly

2017-03-29 Thread a
Hello!


Test result:
When I load csv data into carbondata table 3 times,the executors distributed 
unevenly。My  purpose is one node one task,but the result is some node has 2 
task and some node has no task。
See the load data 1.png,data 2.png,data 3.png。
The carbondata data.PNG is the data structure in hadoop.


I load 4   records into carbondata table takes 2629s seconds,its too 
long。


Question:
How can i make the executors distributed evenly ?


The environment:

spark2.1+carbondata1.1,there are 7 datanodes.


./bin/spark-shell   \
--master yarn \
--deploy-mode client  \
--num-executors n \ (the first time is 7(result in load data 1.png),the second 
time is 6(result in load data 2.png),the three time is 8(result in load 
data3.png))
--executor-cores 10 \
--executor-memory 40G \
--driver-memory 8G \


carbon.properties
 DataLoading Configuration 
carbon.sort.file.buffer.size=20
carbon.graph.rowset.size=1
carbon.number.of.cores.while.loading=10
carbon.sort.size=5
carbon.number.of.cores.while.compacting=10
carbon.number.of.cores=10


Best regards!







Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

2017-03-29 Thread Aniket Adnaik
Hi Jacky,

Thanks for your comments. I guess i should have uploaded in google doc
format instead of pdf, somehow google doc messes up all the diagrams if I
copy paste and i have not figured the way to fix it. Anyway, I apologize
for the inconvenience for those wanted to add in-line comments in the
document. For now, I will try to address your questions through email below
and see if I can reload comment-able version in google docs.

Context : The table status file in metadata folder will be used to indicate
the status of streaming ingestion, such as in-progress or
successful.
Jacky>> Does this mean status file need to be rewritten for every mini
batch?
AA>> currently Yes, this may not so efficient, we can think of keeping this
info into metastore. Let me know if you have any ideas.

Context: Only one writer (executor) is allowed to write into streaming data
file.
Jacky>> Every writer write to one file, but there could be parallel writer
write to different file, right?
AA>> Yes, there could be parallel executor writing to a different file.

Context: BlockHeader
Jacky>> In BlockletHeader, I think two more fields need to add:
i64 blocklet_offset and list datachunks ; DataChunk3 contains
the min/max of each column in each page mutation, blockletindex,
blockletinfo, dictionary these are not required
AA>> Yes, this probably needs more refinement and discussion. I was
thinking more in the lines of using existing V3 format , than adding a new
one.

Context: Approach-2 for file format
Jacky>> Is this metadata file appendable? It should not have Footer then
And how to maintain the locality of this file and the stream file together?
AA>> yes metdata file will be appendable. Footer will added when file is
complete. Ideally co-location with base streaming file will be the best
case, not sure if HDFS data placement policy provides any configuration.

Context: Write Flow diagram
Jacky>> 1.In structured streaming, does not the executor receive event
directly from streaming source?
AA>> Yes , After receiver is setup , driver will have StreamingQueryListner
to communicate with executors. I will add arrows from source to executors
to be more clear.
2.Is the metadata protected by some lock? How 2 executor write to it
simultaniencely?
AA>> yes, metadata will to be protected by lock. Again, need to explore
more efficient way if there is one.

Context: Read Consistency
Jacky>> I think more need to analyze here, how about a query consist of 2
scan operations in different stages?
AA>> Need to check on that. My assumption is that we have only one query
start time-stamp that which can be utilized.

Context: Compaction
Jacky>>Can we have some policy so that user does not need to manually
trigger it?
AA>> Yes, this needs to be configurable based on number of streaming files.

Context: Spark Structured streaming info/ background , "No aggregation
supported"
Jacky>> you mean no aggregate query is allowed?
AA>> This limit is on the writer side, means spark writeStream with file
sink for parquet does not support performing aggregation before writing to
file sink. Once the data is written, it should be able with aggregated
query.

Best Regards,
Aniket

On Wed, Mar 29, 2017 at 8:46 AM, Jacky Li  wrote:

> Hi Aniket,
>
> Comment inline
> And I have put some review comment in the PDF here:
> https://drive.google.com/file/d/0B5vjWGChUwXdSUV0OTFkTGE4am8/
> view?usp=sharing  0B5vjWGChUwXdSUV0OTFkTGE4am8/view?usp=sharing>
>
> > 在 2017年3月29日,上午7:10,Aniket Adnaik  写道:
> >
> > Hi Jacky,
> >
> > Please see my comments below;
> > 1. In this phase, is it still using columnar format? Save to a file for
> > every mini batch? If so, it is only readable after the file has been
> closed
> > and some metadata need to be kept to indicate the availability of the new
> > file.
> >
> > AA >> yes, for initial phase it will use default columnar format and save
> > to file every mini batch. Closing of file may not be needed, as HDFS
> allows
> > single writer-multiple readers. But yes it will require us to maintain a
> > streaming_status file to let readers know about valid timestamp and
> offsets
> > during getsplits.
> >
> > 2. How to map the partition concept in spark to files in streaming
> segment?
> > I guess some small file will be created, right?
> >
> > AA>> In streaming context, writeStream.partitionBy() may require
> CarbonData
> > to create separate folder for each partition.
> > Folder may look like  \TableName\_Fact\part0\StreamingSegment\
> > *partition_0\streamingfile.001*
> > However, I am not sure how carbondata will utilize this partition info as
> > my assumption is currently CarbonData does not support
> partitioning.Also, I
> > am not sure if existing table with no partitioning schema can work well.
> > This needs further analysis.
> >
>
> Currently carbon does not support partition yet, but we do have future
> plan for partitioning, for the bulkload scenario. The 

Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

2017-03-29 Thread Aniket Adnaik
Hi Liang,

Thanks, please see my comments to your questions.

2. Whether support compaction for streaming ingested data to add index, or
not ?
AA>> Yes, Eventually we would need streaming data files to be compacted
into regular read optimized CarbonData format. Triggering of compaction can
be based on the number of files in streaming segment.

3. For first version of streaming ingestion feature, will support which kind
of streaming processing system?
Structured streaming and Kafka ?  any other ?
AA>> for first phase we can support file source and socket source. For
Kafka as streaming source, there are some additional functionalities needs
to be covered like partitioning, Kafka offset management and , consistency
with carbon streaming ingestion, so we may defer it for later phase.

Best Regards,
Aniket


On Wed, Mar 29, 2017 at 2:00 AM, Liang Chen  wrote:

> Hi Aniket
>
> Thanks for your great contribution, The feature of ingestion streaming data
> to carbondata would be very useful for some real-time query scenarios.
>
> Some inputs from my side:
>
> 1. I agree with approach 2 for streaming file format, the performance for
> query must be ensured.
> 2. Whether support compaction for streaming ingested data to add index, or
> not ?
> 
> 
> CarbonData shall use write optimized format (instead of multi-layered
> indexed columnar format) to support ingestion of streaming data into a
> CarbonData table.
>
> 3. For first version of streaming ingestion feature, will support which
> kind
> of streaming processing system?
> Structured streaming and Kafka ?  any other ?
>
> Regards
> Liang
>
>
> Aniket Adnaik wrote
> > Hi All,
> >
> > I would like to open up a discussion for new feature to support streaming
> > ingestion in CarbonData.
> >
> > Please refer to design document(draft) in the link below.
> >   https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
> > /view?usp=sharing
> >
> > Your comments/suggestions are welcome.
> > Here are some high level points.
> >
> > Rationale:
> > The current ways of adding user data to CarbonData table is via LOAD
> > statement or using SELECT query with INSERT INTO statement. These methods
> > add bulk of data into CarbonData table into a new segment. Basically, it
> > is
> > a batch insertion for a bulk of data. However, with increasing demand of
> > real time data analytics with streaming frameworks, CarbonData needs a
> way
> > to insert streaming data continuously into CarbonData table. CarbonData
> > needs a support for continuous and faster ingestion into CarbonData table
> > and make it available for querying.
> >
> > CarbonData can leverage from our newly introduced V3 format to append
> > streaming data to existing carbon table.
> >
> >
> > Requirements:
> >
> > Following are some high level requirements;
> > 1.  CarbonData shall create a new segment (Streaming Segment) for each
> > streaming session. Concurrent streaming ingestion into same table will
> > create separate streaming segments.
> >
> > 2.  CarbonData shall use write optimized format (instead of multi-layered
> > indexed columnar format) to support ingestion of streaming data into a
> > CarbonData table.
> >
> > 3.  CarbonData shall create streaming segment folder and open a streaming
> > data file in append mode to write data. CarbonData should avoid creating
> > multiple small files by appending to an existing file.
> >
> > 4.  The data stored in new streaming segment shall be available for query
> > after it is written to the disk (hflush/hsync). In other words,
> CarbonData
> > Readers should be able to query the data in streaming segment written so
> > far.
> >
> > 5.  CarbonData should acknowledge the write operation status back to
> > output
> > sink/upper layer streaming engine so that in the case of write failure,
> > streaming engine should restart the operation and maintain exactly once
> > delivery semantics.
> >
> > 6.  CarbonData Compaction process shall support compacting data from
> > write-optimized streaming segment to regular read optimized columnar
> > CarbonData format.
> >
> > 7.  CarbonData readers should maintain the read consistency by means of
> > using timestamp.
> >
> > 8.  Maintain durability - in case of write failure, CarbonData should be
> > able recover to latest commit status. This may require maintaining source
> > and destination offsets of last commits in a metadata.
> >
> > This feature can be done in phases;
> >
> > Phase -1 : Add basic framework and writer support to allow Spark
> > Structured
> > streaming into CarbonData . This phase may or may not have append
> support.
> > Add reader support to read streaming data files.
> >
> > Phase-2 : Add append support if not done in phase 1. Maintain append
> > offsets and metadata information.
> >
> > Phase -3 : Add support for external streaming frameworks such as Kafka
> > streaming using 

Re: Re: Optimize Order By + Limit Query

2017-03-29 Thread Ravindra Pesala
Hi,

You mean Carbon do the sorting if the order by column is not first column
and provide only limit values to spark. But the same job spark is also
doing it just sorts the partition and gets the top values out of it. You
can reduce the table_blocksize to get the better sort performance as spark
try to do sorting inside memory.

I can see we can do some optimizations in integration layer itself with out
pushing down any logic to carbon like if the order by column is first
column then we can just get limit values with out sorting any data.

Regards,
Ravindra.

On 29 March 2017 at 08:58, 马云  wrote:

> Hi Ravindran,
> Thanks for your quick response. please see my answer as below
> 
>  What if the order by column is not the first column? It needs to scan all
> blocklets to get the data out of it if the order by column is not first
> column of mdk
> 
> Answer :  if step2 doesn't filter any blocklet, you are right,It needs to
> scan all blocklets to get the data out of it if the order by column is not
> first column of mdk
> but it just scan all the order by column's data, for
> others columns data,  use the lazy-load strategy and  it can reduce scan
> accordingly to  limit value.
> Hence you can see the performance is much better now
> after  my optimization. Currently the carbondata order by + limit
> performance is very bad since it scans all data.
>in my test there are  20,000,000 data, it takes more than
> 10s, if data is much more huge,  I think it is hard for user to stand such
> bad performance when they do order by + limit  query?
>
>
> 
>  We used to have multiple push down optimizations from spark to carbon
> like aggregation, limit, topn etc. But later it was removed because it is
> very hard to maintain for version to version. I feel it is better that
> execution engine like spark can do these type of operations.
> 
> Answer : In my opinion, I don't think "hard to maintain for version to
> version" is a good reason to give up the order by  + limit optimization.
> I think it can create new class to extends current and try to reduce the
> impact for the current code. Maybe can make it is easy to maintain.
> Maybe I am wrong.
>
>
>
>
> At 2017-03-29 02:21:58, "Ravindra Pesala"  wrote:
>
>
> Hi Jarck Ma,
>
> It is great to try optimizing Carbondata.
> I think this solution comes up with many limitations. What if the order by
> column is not the first column? It needs to scan all blocklets to get the
> data out of it if the order by column is not first column of mdk.
>
> We used to have multiple push down optimizations from spark to carbon like
> aggregation, limit, topn etc. But later it was removed because it is very
> hard to maintain for version to version. I feel it is better that execution
> engine like spark can do these type of operations.
>
>
> Regards,
> Ravindra.
>
>
>
> On Tue, Mar 28, 2017, 14:28 马云  wrote:
>
>
> Hi Carbon Dev,
>
> Currently I have done optimization for ordering by 1 dimension.
>
> my local performance test as below. Please give your suggestion.
>
>
>
>
> | data count | test sql | limit value in sql | performance(ms) |
> | optimized code | original code |
> | 20,000,000 | SELECT name, serialname, country, salary, id, date FROM t3
> ORDER BY country limit 1000 | 1000 | 677 | 10906 |
> | SELECT name, serialname, country, salary, id, date FROM t3 ORDER BY
> serialname limit 1 | 1 | 1897 | 12108 |
> | SELECT name, serialname, country, salary, id, date FROM t3 ORDER BY
> serialname limit 5 | 5 | 2814 | 14279 |
>
> my optimization solution for order by 1 dimension + limit as below
>
> mainly filter some unnecessary blocklets and leverage  the dimension's
> order stored feature to get sorted data in each partition.
>
> at last use the TakeOrderedAndProject to merge sorted data from partitions
>
> step1. change logical plan and push down the order by and limit
> information to carbon scan
>
> and change sort physical plan to TakeOrderedAndProject  since
> data will be get and sorted in each partition
>
> step2. in each partition apply the limit number, blocklet's min_max index
> to filter blocklet.
>
>   it can reduce scan data if some blocklets were filtered
>
>  for example,  SELECT name, serialname, country, salary, id, date
> FROM t3 ORDER BY serialname limit 1
>
>  supposing there are 2 blocklets , each has 32000 data, serial name  is
> between serialname1 to serialname2 in the first blocklet
>
> and between  serialname2 to serialname3 in the second blocklet. Actually
> we only need to scan the first blocklet
>
> since 32000 > 100 and first blocklet's serial name <= second blocklet's
> serial name
>
>
>
> 

Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

2017-03-29 Thread Jacky Li
Hi Aniket,

Comment inline
And I have put some review comment in the PDF here: 
https://drive.google.com/file/d/0B5vjWGChUwXdSUV0OTFkTGE4am8/view?usp=sharing 


> 在 2017年3月29日,上午7:10,Aniket Adnaik  写道:
> 
> Hi Jacky,
> 
> Please see my comments below;
> 1. In this phase, is it still using columnar format? Save to a file for
> every mini batch? If so, it is only readable after the file has been closed
> and some metadata need to be kept to indicate the availability of the new
> file.
> 
> AA >> yes, for initial phase it will use default columnar format and save
> to file every mini batch. Closing of file may not be needed, as HDFS allows
> single writer-multiple readers. But yes it will require us to maintain a
> streaming_status file to let readers know about valid timestamp and offsets
> during getsplits.
> 
> 2. How to map the partition concept in spark to files in streaming segment?
> I guess some small file will be created, right?
> 
> AA>> In streaming context, writeStream.partitionBy() may require CarbonData
> to create separate folder for each partition.
> Folder may look like  \TableName\_Fact\part0\StreamingSegment\
> *partition_0\streamingfile.001*
> However, I am not sure how carbondata will utilize this partition info as
> my assumption is currently CarbonData does not support partitioning.Also, I
> am not sure if existing table with no partitioning schema can work well.
> This needs further analysis.
> 

Currently carbon does not support partition yet, but we do have future plan for 
partitioning, for the bulkload scenario. The draft idea is to add a partition 
step before the input step in current loading pipeline framework. And the 
folder structure may look like: \TableName\Fact\part0\segment0. I will describe 
it in another thread. It think user can use the same partition key for both 
bulkload and streaming ingest.


> 3. Phase-2 : Add append support if not done in phase 1. Maintain append 
> offsets
> and metadata information.
> Is the streaming data file format implemented in this phase?
> AA>>  I think we can directly leverage from existing V3 format without much
> changes in basic writer/reader framework, in that case implementing
> streaming file format is a possibility.
> 
> Best Regards,
> Aniket
> 
> On Tue, Mar 28, 2017 at 8:22 AM, Jacky Li  wrote:
> 
>> Hi Aniket,
>> 
>> This feature looks great, the overall plan also seems fine to me. Thanks
>> for proposing it.
>> And I have some doubts inline.
>> 
>>> 在 2017年3月27日,下午6:34,Aniket Adnaik  写道:
>>> 
>>> Hi All,
>>> 
>>> I would like to open up a discussion for new feature to support streaming
>>> ingestion in CarbonData.
>>> 
>>> Please refer to design document(draft) in the link below.
>>> https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
>>> /view?usp=sharing
>>> 
>>> Your comments/suggestions are welcome.
>>> Here are some high level points.
>>> 
>>> Rationale:
>>> The current ways of adding user data to CarbonData table is via LOAD
>>> statement or using SELECT query with INSERT INTO statement. These methods
>>> add bulk of data into CarbonData table into a new segment. Basically, it
>> is
>>> a batch insertion for a bulk of data. However, with increasing demand of
>>> real time data analytics with streaming frameworks, CarbonData needs a
>> way
>>> to insert streaming data continuously into CarbonData table. CarbonData
>>> needs a support for continuous and faster ingestion into CarbonData table
>>> and make it available for querying.
>>> 
>>> CarbonData can leverage from our newly introduced V3 format to append
>>> streaming data to existing carbon table.
>>> 
>>> 
>>> Requirements:
>>> 
>>> Following are some high level requirements;
>>> 1.  CarbonData shall create a new segment (Streaming Segment) for each
>>> streaming session. Concurrent streaming ingestion into same table will
>>> create separate streaming segments.
>>> 
>>> 2.  CarbonData shall use write optimized format (instead of multi-layered
>>> indexed columnar format) to support ingestion of streaming data into a
>>> CarbonData table.
>>> 
>>> 3.  CarbonData shall create streaming segment folder and open a streaming
>>> data file in append mode to write data. CarbonData should avoid creating
>>> multiple small files by appending to an existing file.
>>> 
>>> 4.  The data stored in new streaming segment shall be available for query
>>> after it is written to the disk (hflush/hsync). In other words,
>> CarbonData
>>> Readers should be able to query the data in streaming segment written so
>>> far.
>>> 
>>> 5.  CarbonData should acknowledge the write operation status back to
>> output
>>> sink/upper layer streaming engine so that in the case of write failure,
>>> streaming engine should restart the operation and maintain exactly once
>>> delivery semantics.
>>> 
>>> 6.  CarbonData Compaction process 

[jira] [Created] (CARBONDATA-832) Data loading is failing with duplicate header column in csv file

2017-03-29 Thread kumar vishal (JIRA)
kumar vishal created CARBONDATA-832:
---

 Summary: Data loading is failing with duplicate header column in 
csv file
 Key: CARBONDATA-832
 URL: https://issues.apache.org/jira/browse/CARBONDATA-832
 Project: CarbonData
  Issue Type: Bug
Reporter: kumar vishal
Assignee: kumar vishal


Problem : data mismatch issue when csv column having duplicate column header.
Solution: row parser impl logic of getting indexes is having issue



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: carbondata hive

2017-03-29 Thread Sea
set hive.mapred.supports.subdirectories=true;
set mapreduce.input.fileinputformat.input.dir.recursive=true;


-- Original --
From:  "261810726";<261810...@qq.com>;
Date:  Wed, Mar 29, 2017 07:42 PM
To:  "dev"; 

Subject:  Re: carbondata hive 



Hi, fengyun:
please do as the follow steps


https://github.com/cenyuhai/incubator-carbondata/blob/CARBONDATA-727/integration/hive/hive-guide.md




-- Original --
From:  "";<1141982...@qq.com>;
Date:  Wed, Mar 29, 2017 09:21 AM
To:  "dev"; 

Subject:  carbondata hive 



How to synchronize carbondata data with hive

Re: carbondata hive

2017-03-29 Thread Sea
Hi, fengyun:
please do as the follow steps


https://github.com/cenyuhai/incubator-carbondata/blob/CARBONDATA-727/integration/hive/hive-guide.md




-- Original --
From:  "";<1141982...@qq.com>;
Date:  Wed, Mar 29, 2017 09:21 AM
To:  "dev"; 

Subject:  carbondata hive 



How to synchronize carbondata data with hive

[jira] [Created] (CARBONDATA-831) can't run PerfTest example

2017-03-29 Thread sehriff (JIRA)
sehriff created CARBONDATA-831:
--

 Summary: can't run PerfTest example
 Key: CARBONDATA-831
 URL: https://issues.apache.org/jira/browse/CARBONDATA-831
 Project: CarbonData
  Issue Type: Bug
Reporter: sehriff


can not run PerfTest in 
incubator-carbondata-apache-carbondata-1.0.0-incubating-rc2\examples\spark\src\main\scala\org\apache\carbondata\examples\
errlog:
Exception in thread "main" java.lang.NoSuchMethodError: 
scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
at 
org.apache.spark.util.ScalaCompilerUtil$.compiledCode(ScalaCompilerUtil.scala:29)
at 
org.apache.spark.sql.CodeGenerateFactory.(CodeGenerateFactory.scala:32)
at 
org.apache.spark.sql.CodeGenerateFactory$.init(CodeGenerateFactory.scala:67)
at org.apache.spark.sql.CarbonContext.(CarbonContext.scala:58)
at 
org.apache.carbondata.examples.util.ExampleUtils$.createCarbonContext(ExampleUtils.scala:44)
at org.apache.carbondata.examples.PerfTest$.main(PerfTest.scala:271)
at org.apache.carbondata.examples.PerfTest.main(PerfTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

2017-03-29 Thread Liang Chen
Hi Aniket

Thanks for your great contribution, The feature of ingestion streaming data
to carbondata would be very useful for some real-time query scenarios.

Some inputs from my side:

1. I agree with approach 2 for streaming file format, the performance for
query must be ensured.
2. Whether support compaction for streaming ingested data to add index, or
not ?

CarbonData shall use write optimized format (instead of multi-layered 
indexed columnar format) to support ingestion of streaming data into a 
CarbonData table. 

3. For first version of streaming ingestion feature, will support which kind
of streaming processing system?
Structured streaming and Kafka ?  any other ? 

Regards
Liang


Aniket Adnaik wrote
> Hi All,
> 
> I would like to open up a discussion for new feature to support streaming
> ingestion in CarbonData.
> 
> Please refer to design document(draft) in the link below.
>   https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
> /view?usp=sharing
> 
> Your comments/suggestions are welcome.
> Here are some high level points.
> 
> Rationale:
> The current ways of adding user data to CarbonData table is via LOAD
> statement or using SELECT query with INSERT INTO statement. These methods
> add bulk of data into CarbonData table into a new segment. Basically, it
> is
> a batch insertion for a bulk of data. However, with increasing demand of
> real time data analytics with streaming frameworks, CarbonData needs a way
> to insert streaming data continuously into CarbonData table. CarbonData
> needs a support for continuous and faster ingestion into CarbonData table
> and make it available for querying.
> 
> CarbonData can leverage from our newly introduced V3 format to append
> streaming data to existing carbon table.
> 
> 
> Requirements:
> 
> Following are some high level requirements;
> 1.  CarbonData shall create a new segment (Streaming Segment) for each
> streaming session. Concurrent streaming ingestion into same table will
> create separate streaming segments.
> 
> 2.  CarbonData shall use write optimized format (instead of multi-layered
> indexed columnar format) to support ingestion of streaming data into a
> CarbonData table.
> 
> 3.  CarbonData shall create streaming segment folder and open a streaming
> data file in append mode to write data. CarbonData should avoid creating
> multiple small files by appending to an existing file.
> 
> 4.  The data stored in new streaming segment shall be available for query
> after it is written to the disk (hflush/hsync). In other words, CarbonData
> Readers should be able to query the data in streaming segment written so
> far.
> 
> 5.  CarbonData should acknowledge the write operation status back to
> output
> sink/upper layer streaming engine so that in the case of write failure,
> streaming engine should restart the operation and maintain exactly once
> delivery semantics.
> 
> 6.  CarbonData Compaction process shall support compacting data from
> write-optimized streaming segment to regular read optimized columnar
> CarbonData format.
> 
> 7.  CarbonData readers should maintain the read consistency by means of
> using timestamp.
> 
> 8.  Maintain durability - in case of write failure, CarbonData should be
> able recover to latest commit status. This may require maintaining source
> and destination offsets of last commits in a metadata.
> 
> This feature can be done in phases;
> 
> Phase -1 : Add basic framework and writer support to allow Spark
> Structured
> streaming into CarbonData . This phase may or may not have append support.
> Add reader support to read streaming data files.
> 
> Phase-2 : Add append support if not done in phase 1. Maintain append
> offsets and metadata information.
> 
> Phase -3 : Add support for external streaming frameworks such as Kafka
> streaming using spark structured steaming, maintain
> topics/partitions/offsets and support fault tolerance .
> 
> Phase-4 : Add support to other streaming frameworks , such as flink , beam
> etc.
> 
> Phase-5: Future support for in-memory cache for buffering streaming data,
> support for union with Spark Structured streaming to serve directly from
> spark structured streaming.  And add support for Time series data.
> 
> Best Regards,
> Aniket





--
View this message in context: 
http://apache-carbondata-mailing-list-archive.1130556.n5.nabble.com/DISCUSSION-New-Feature-Streaming-Ingestion-into-CarbonData-tp9724p9803.html
Sent from the Apache CarbonData Mailing List archive mailing list archive at 
Nabble.com.


Re: [DISCUSSION] Initiating Apache CarbonData-1.1.0 incubating Release

2017-03-29 Thread Henry Saputra
Sure, lets do one more release

+1

On Mon, Mar 27, 2017 at 2:58 AM, manish gupta 
wrote:

> +1
>
> Regards
> Manish Gupta
>
> On Mon, Mar 27, 2017 at 2:41 PM, Kumar Vishal 
> wrote:
>
> > +1
> > -Regards
> > Kumar Vishal
> >
> > On Mar 27, 2017 09:31, "xm_zzc" <441586...@qq.com> wrote:
> >
> > > Hi, Liang:
> > >   Thanks for your reply.
> > >
> > >
> > >
> > > --
> > > View this message in context: http://apache-carbondata-
> > > mailing-list-archive.1130556.n5.nabble.com/Re-DISCUSSION-
> > > Initiating-Apache-CarbonData-1-1-0-incubating-Release-tp9672p9680.html
> > > Sent from the Apache CarbonData Mailing List archive mailing list
> archive
> > > at Nabble.com.
> > >
> >
>


[jira] [Created] (CARBONDATA-830) Incorrect schedule for NewCarbonDataLoadRDD

2017-03-29 Thread Weizhong (JIRA)
Weizhong created CARBONDATA-830:
---

 Summary: Incorrect schedule for NewCarbonDataLoadRDD
 Key: CARBONDATA-830
 URL: https://issues.apache.org/jira/browse/CARBONDATA-830
 Project: CarbonData
  Issue Type: Bug
  Components: spark-integration
Affects Versions: 1.0.0-incubating
 Environment: Spark 2.1 + Carbon 1.0.0
Reporter: Weizhong
Assignee: Weizhong
Priority: Minor


Currently NewCarbonDataLoadRDD's getPreferredLocations will return all locs 
rather than 1, then on Spark may pick the same node for two tasks, so one node 
is getting over loaded with the task and one has no task to do, and impacting 
the performance despite of any failure.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)