Re: Should Flume integration be behind a profile?

2017-10-02 Thread Sean Owen
CCing user@
Yeah good point about perhaps moving the examples into the module itself.
Actually removing it would be a long way off, no matter what.

On Mon, Oct 2, 2017 at 8:35 AM Nick Pentreath 
wrote:

> I'd agree with #1 or #2. Deprecation now seems fine.
>
> Perhaps this should be raised on the user list also?
>
> And perhaps it makes sense to look at moving the Flume support into Apache
> Bahir if there is interest (I've cc'ed Bahir dev list here)? That way the
> current state of the connector could keep going for those users who may
> need it.
>
> As for examples, for the Kinesis connector the examples now live in the
> subproject (see e.g. KinesisWordCountASL under external/kinesis-asl). So we
> don't have to completely remove the examples, just move them (this may not
> solve the doc issue but at least the examples are still there for anyone
> who needs them).
>
> On Mon, 2 Oct 2017 at 06:36 Mridul Muralidharan  wrote:
>
>> I agree, proposal 1 sounds better among the options.
>>
>> Regards,
>> Mridul
>>
>>
>> On Sun, Oct 1, 2017 at 3:50 PM, Reynold Xin  wrote:
>> > Probably should do 1, and then it is an easier transition in 3.0.
>> >
>> > On Sun, Oct 1, 2017 at 1:28 AM Sean Owen  wrote:
>> >>
>> >> I tried and failed to do this in
>> >> https://issues.apache.org/jira/browse/SPARK-22142 because it became
>> clear
>> >> that the Flume examples would have to be removed to make this work,
>> too.
>> >> (Well, you can imagine other solutions with extra source dirs or
>> modules for
>> >> flume examples enabled by a profile, but that doesn't help the docs
>> and is
>> >> nontrivial complexity for little gain.)
>> >>
>> >> It kind of suggests Flume support should be deprecated if it's put
>> behind
>> >> a profile. Like with Kafka 0.8. (This is why I'm raising it again to
>> the
>> >> whole list.)
>> >>
>> >> Any preferences among:
>> >> 1. Put Flume behind a profile, remove examples, deprecate
>> >> 2. Put Flume behind a profile, remove examples, but don't deprecate
>> >> 3. Punt until Spark 3.0, when this integration would probably be
>> removed
>> >> entirely (?)
>> >>
>> >> On Tue, Sep 26, 2017 at 10:36 AM Sean Owen  wrote:
>> >>>
>> >>> Not a big deal, but I'm wondering whether Flume integration should at
>> >>> least be opt-in and behind a profile? it still sees some use (at
>> least on
>> >>> our end) but not applicable to the majority of users. Most other
>> third-party
>> >>> framework integrations are behind a profile, like YARN, Mesos,
>> Kinesis,
>> >>> Kafka 0.8, Docker. Just soliciting comments, not arguing for it.
>> >>>
>> >>> (Well, actually it annoys me that the Flume integration always fails
>> to
>> >>> compile in IntelliJ unless you generate the sources manually)
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: Flume integration

2016-11-21 Thread Ian Brooks
Hi Mich,

Thanks. I would prefer not to add another system into the mix as we currently 
don't use kafka at all. We are still in the prototype phase at the moment and 
it seems to be working well though it doesn't like you restrating the flume 
sink part without restarting the SPARK application. That is something we should 
be able to manage though.



*-Ian *


Hi Ian,


Flume is great for ingesting data into HDFS and Hbase. However, that is part of 
batch layer.


For real time processing, I would go through Kafka into spark streaming. Except 
your case, I have not established if anyone else does Flume directly into Spark?


If so how mature is it.


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[1]/
 
  
http://talebzadehmich.wordpress.com[2]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
  


On 21 November 2016 at 10:27, Ian Brooks  wrote:




*-Ian*


Hi
While I am following this discussion with interest, I am trying to comprehend 
any architectural benefit of a spark sink.
Is there any feature in flume makes it more suitable to ingest stream data than 
sppark streaming, so that we should chain them? For example does it help 
durability or reliability of the source?
Or, it is a more tactical choice based on connector availability or such?
To me, flume is important component to ingest streams to hdfs or hive directly 
ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:


Hi Ian,


Has this been resolved?


How about data to Flume and then Kafka and Kafka streaming into Spark?


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[1]/
 
  
http://talebzadehmich.wordpress.com[2]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
  


On 13 July 2016 at 11:13, Ian Brooks  wrote:


Hi,
 
I'm currently trying to implement a prototype Spark application that gets data 
from Flume and processes it. I'm using the pull based method mentioned in 
https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html[5] 
 
The is initially working fine for getting data from Flume, however the Spark 
client doesn't appear to be letting Flume know that the data has been received, 
so Flume doesn't remove it from the batch. 
 
After 100 requests Flume stops allowing any new data and logs
 
08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] 
(org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error 
while processing transaction. 

 
My code to pull the data from Flume is
 
SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(1);
final String checkpointDir = "/tmp/";
 
final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
ssc.checkpoint(checkpointDir);
JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, host, port);
 
// Transform each flume avro event to a process-able format
JavaDStream transformedEvents = flumeStream.map(new 
Function() {
 
@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception {
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();
Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class); 
HashMap body = avroData.getBody();
String data = body.get("bytes");
return data;
}
});
 
...
 
ssc.start();
ssc.awaitTermination();
ssc.close();
}
 
Is there something specific I should be doing to let the Flume server know the 
batch has been received and processed?


*Ian Brooks*
 




*Ian Brooks*
Lead Cloud Systems Engineer
 
Mobile: +44 7900987187[6]
UK Office: +44 131 629 5155[7]
US Office: +1 650 943 2403[8]
Skype: ijbrooks
 
E-mail: _i.brooks@sensewhere.com_ 
Web: www.sensewhere.com[9] 
 
*sensewhere Ltd*. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA. 
Company Number: SC357036
*sensewhere USA* 800 West El Camino Real, Suite 180, Mountain View, California, 
94040
*sensewhere China* Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue, Nanshan 
District, Shenzhen 51806
 
  




*Ian Brooks*
Lead Cloud Systems Engineer

Mobile: +44 7900987187
UK Office: +44 131 629 5155
US Office: +1 650 943 2403
Skype: ijbrooks

E-mail: i.bro...@sensewhere.com[10] 
Web: www.sensewhere.com[9] 

*sensewhere Ltd*. 4th 

Re: Flume integration

2016-11-21 Thread Mich Talebzadeh
Hi Ian,

Flume is great for ingesting data into HDFS and Hbase. However, that is
part of batch layer.

For real time processing, I would go through Kafka into spark streaming.
Except your case, I have not established if anyone else does Flume directly
into Spark?

If so how mature is it.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 21 November 2016 at 10:27, Ian Brooks  wrote:

>
> We use Flume already as our way of getting data from our application in to
> HDFS and HBase, we have some new work we are looking at that requires
> realtime processing on data that we don't need to store, so It fits into
> our existing platform easier just to pass the data through Flume like
> everything else and just route this data to SPARK.
>
> -Ian
>
>
>
>
> On Monday 21 November 2016 07:59:42 ayan guha wrote:
>
> Hi
>
> While I am following this discussion with interest, I am trying to
> comprehend any architectural benefit of a spark sink.
>
> Is there any feature in flume makes it more suitable to ingest stream data
> than sppark streaming, so that we should chain them? For example does it
> help durability or reliability of the source?
>
> Or, it is a more tactical choice based on connector availability or such?
>
> To me, flume is important component to ingest streams to hdfs or hive
> directly ie it plays on the batch side of lambda architecture pattern.
>
> On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:
>
> Hi Ian,
>
>
> Has this been resolved?
>
>
> How about data to Flume and then Kafka and Kafka streaming into Spark?
>
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On 13 July 2016 at 11:13, Ian Brooks  wrote:
>
> Hi,
>
>
>
> I'm currently trying to implement a prototype Spark application that gets
> data from Flume and processes it. I'm using the pull based method mentioned
> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>
>
>
> The is initially working fine for getting data from Flume, however the
> Spark client doesn't appear to be letting Flume know that the data has been
> received, so Flume doesn't remove it from the batch.
>
>
>
> After 100 requests Flume stops allowing any new data and logs
>
>
>
> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
> Error while processing transaction.
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
> MemoryChannel.java:96)
>
>
>
> My code to pull the data from Flume is
>
>
>
> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> Duration batchInterval = new Duration(1);
>
> final String checkpointDir = "/tmp/";
>
>
>
> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
> ssc.checkpoint(checkpointDir);
>
> JavaReceiverInputDStream flumeStream = 
> FlumeUtils.createPollingStream(ssc,
> host, port);
>
>
>
> // Transform each flume avro event to a process-able format
>
> JavaDStream transformedEvents = flumeStream.map(new
> Function() {
>
>
>
> @Override
>
> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> String flumeEventStr = flumeEvent.event().toString();
>
> avroData avroData = new avroData();
>
> Gson gson = new GsonBuilder().create();
>
> avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> HashMap body = avroData.getBody();
>
> String data = body.get("bytes");
>
> return data;
>
> }
>
> });
>
>
>
> ...
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> ssc.close();
>
> }
>
>
>
> Is there something specific I should be doing to let the Flume server know
> the batch has been received and processed?
>
>
> --
>
> Ian Brooks
>
>
>
>
>
>
>
> --
>
> Ian Brooks
>
> Lead Cloud Systems Engineer
>
>
>
> Mobile: +44 7900987187
>
> UK Office: +44 131 629 5155
>
> US Offi

Re: Flume integration

2016-11-21 Thread Ian Brooks

*-Ian*


Hi
While I am following this discussion with interest, I am trying to comprehend 
any architectural benefit of a spark sink.
Is there any feature in flume makes it more suitable to ingest stream data than 
sppark streaming, so that we should chain them? For example does it help 
durability or reliability of the source?
Or, it is a more tactical choice based on connector availability or such?
To me, flume is important component to ingest streams to hdfs or hive directly 
ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:


Hi Ian,


Has this been resolved?


How about data to Flume and then Kafka and Kafka streaming into Spark?


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[2]/
 
  
http://talebzadehmich.wordpress.com[3]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
  


On 13 July 2016 at 11:13, Ian Brooks  wrote:


Hi,
 
I'm currently trying to implement a prototype Spark application that gets data 
from Flume and processes it. I'm using the pull based method mentioned in 
https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html[5] 
 
The is initially working fine for getting data from Flume, however the Spark 
client doesn't appear to be letting Flume know that the data has been received, 
so Flume doesn't remove it from the batch. 
 
After 100 requests Flume stops allowing any new data and logs
 
08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] 
(org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error 
while processing transaction. 

 
My code to pull the data from Flume is
 
SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(1);
final String checkpointDir = "/tmp/";
 
final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
ssc.checkpoint(checkpointDir);
JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, host, port);
 
// Transform each flume avro event to a process-able format
JavaDStream transformedEvents = flumeStream.map(new 
Function() {
 
@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception {
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();
Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class); 
HashMap body = avroData.getBody();
String data = body.get("bytes");
return data;
}
});
 
...
 
ssc.start();
ssc.awaitTermination();
ssc.close();
}
 
Is there something specific I should be doing to let the Flume server know the 
batch has been received and processed?


*Ian Brooks*
 




*Ian Brooks*
Lead Cloud Systems Engineer

Mobile: +44 7900987187
UK Office: +44 131 629 5155
US Office: +1 650 943 2403
Skype: ijbrooks

E-mail: i.bro...@sensewhere.com[6] 
Web: www.sensewhere.com[7] 

*sensewhere Ltd*. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA.
Company Number: SC357036
*sensewhere USA* 800 West El Camino Real, Suite 180, Mountain View, California, 
94040
*sensewhere China* Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue, Nanshan 
District, Shenzhen 51806

  


[1] mailto:mich.talebza...@gmail.com
[2] 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
[3] http://talebzadehmich.wordpress.com
[4] mailto:i.bro...@sensewhere.com
[5] https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
[6] mailt:i.bro...@sensewhere.com
[7] http://www.sensewhere.com/


Re: Flume integration

2016-11-20 Thread ayan guha
Hi

While I am following this discussion with interest, I am trying to
comprehend any architectural benefit of a spark sink.

Is there any feature in flume makes it more suitable to ingest stream data
than sppark streaming, so that we should chain them? For example does it
help durability or reliability of the source?

Or, it is a more tactical choice based on connector availability or such?

To me, flume is important component to ingest streams to hdfs or hive
directly ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:

> Hi Ian,
>
> Has this been resolved?
>
> How about data to Flume and then Kafka and Kafka streaming into Spark?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 July 2016 at 11:13, Ian Brooks  wrote:
>
>> Hi,
>>
>>
>>
>> I'm currently trying to implement a prototype Spark application that gets
>> data from Flume and processes it. I'm using the pull based method mentioned
>> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>>
>>
>>
>> The is initially working fine for getting data from Flume, however the
>> Spark client doesn't appear to be letting Flume know that the data has been
>> received, so Flume doesn't remove it from the batch.
>>
>>
>>
>> After 100 requests Flume stops allowing any new data and logs
>>
>>
>>
>> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
>> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
>> Error while processing transaction.
>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>> capacity 100 full, consider committing more frequently, increasing
>> capacity, or increasing thread count
>>at org.apache.flume.channel.MemoryChannel$MemoryTransaction.
>> doTake(MemoryChannel.java:96)
>>
>>
>>
>> My code to pull the data from Flume is
>>
>>
>>
>> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>>
>> Duration batchInterval = new Duration(1);
>>
>> final String checkpointDir = "/tmp/";
>>
>>
>>
>> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>> batchInterval);
>>
>> ssc.checkpoint(checkpointDir);
>>
>> JavaReceiverInputDStream flumeStream =
>> FlumeUtils.createPollingStream(ssc, host, port);
>>
>>
>>
>> // Transform each flume avro event to a process-able format
>>
>> JavaDStream transformedEvents = flumeStream.map(new
>> Function() {
>>
>>
>>
>> @Override
>>
>> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>>
>> String flumeEventStr = flumeEvent.event().toString();
>>
>> avroData avroData = new avroData();
>>
>> Gson gson = new GsonBuilder().create();
>>
>> avroData = gson.fromJson(flumeEventStr, avroData.class);
>>
>> HashMap body = avroData.getBody();
>>
>> String data = body.get("bytes");
>>
>> return data;
>>
>> }
>>
>> });
>>
>>
>>
>> ...
>>
>>
>>
>> ssc.start();
>>
>> ssc.awaitTermination();
>>
>> ssc.close();
>>
>> }
>>
>>
>>
>> Is there something specific I should be doing to let the Flume server
>> know the batch has been received and processed?
>>
>>
>> --
>>
>> Ian Brooks
>>
>>
>>
>
>


Re: Flume integration

2016-11-20 Thread Mich Talebzadeh
Thanks Ian.

Was your source of Flume IBM/MQ by any chance?



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 November 2016 at 16:40, Ian Brooks  wrote:

> Hi Mich,
>
>
>
> Yes, i managed to resolve this one. The issue was because the way
> described in the docs doesn't work properly as in order for the Flume part
> to be notified you need to set the storageLevel on the PollingStream like
>
>
>
> JavaReceiverInputDStream flumeStream = 
> FlumeUtils.createPollingStream(ssc,
> addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 10);
>
>
>
>
>
> After setting this, the data is correclty maked as processed by the SPARK
> reveiver and the Flume sink is notified.
>
>
>
> -Ian
>
>
>
>
>
> > Hi Ian,
>
> >
>
> > Has this been resolved?
>
> >
>
> > How about data to Flume and then Kafka and Kafka streaming into Spark?
>
> >
>
> > Thanks
>
> >
>
> > Dr Mich Talebzadeh
>
> >
>
> >
>
> >
>
> > LinkedIn *
>
> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
> OABU
>
> > rV8Pw
>
> >  OAB
>
> > UrV8Pw>*
>
> >
>
> >
>
> >
>
> > http://talebzadehmich.wordpress.com
>
> >
>
> >
>
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
>
> > loss, damage or destruction of data or any other property which may arise
>
> > from relying on this email's technical content is explicitly disclaimed.
>
> > The author will in no case be liable for any monetary damages arising
> from
>
> > such loss, damage or destruction.
>
> >
>
> > On 13 July 2016 at 11:13, Ian Brooks  wrote:
>
> > > Hi,
>
> > >
>
> > >
>
> > >
>
> > > I'm currently trying to implement a prototype Spark application that
> gets
>
> > > data from Flume and processes it. I'm using the pull based method
>
> > > mentioned
>
> > > in https://spark.apache.org/docs/1.6.1/streaming-flume-
> integration.html
>
> > >
>
> > >
>
> > >
>
> > > The is initially working fine for getting data from Flume, however the
>
> > > Spark client doesn't appear to be letting Flume know that the data has
>
> > > been
>
> > > received, so Flume doesn't remove it from the batch.
>
> > >
>
> > >
>
> > >
>
> > > After 100 requests Flume stops allowing any new data and logs
>
> > >
>
> > >
>
> > >
>
> > > 08 Jul 2016 14:59:00,265 WARN [Spark Sink Processor Thread - 5]
>
> > > (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80) -
>
> > > Error while processing transaction.
>
> > > org.apache.flume.ChannelException: Take list for MemoryTransaction,
>
> > > capacity 100 full, consider committing more frequently, increasing
>
> > > capacity, or increasing thread count
>
> > >
>
> > > at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
>
> > >
>
> > > MemoryChannel.java:96)
>
> > >
>
> > >
>
> > >
>
> > > My code to pull the data from Flume is
>
> > >
>
> > >
>
> > >
>
> > > SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> > >
>
> > > Duration batchInterval = new Duration(1);
>
> > >
>
> > > final String checkpointDir = "/tmp/";
>
> > >
>
> > >
>
> > >
>
> > > final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>
> > > batchInterval);
>
> > >
>
> > > ssc.checkpoint(checkpointDir);
>
> > >
>
> > > JavaReceiverInputDStream flumeStream =
>
> > > FlumeUtils.createPollingStream(ssc, host, port);
>
> > >
>
> > >
>
> > >
>
> > > // Transform each flume avro event to a process-able format
>
> > >
>
> > > JavaDStream transformedEvents = flumeStream.map(new
>
> > > Function() {
>
> > >
>
> > >
>
> > >
>
> > > @Override
>
> > >
>
> > > public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> > >
>
> > > String flumeEventStr = flumeEvent.event().toString();
>
> > >
>
> > > avroData avroData = new avroData();
>
> > >
>
> > > Gson gson = new GsonBuilder().create();
>
> > >
>
> > > avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> > >
>
> > > HashMap body = avroData.getBody();
>
> > >
>
> > > String data = body.get("bytes");
>
> > >
>
> > > return data;
>
> > >
>
> > > }
>
> > >
>
> > > });
>
> > >
>
> > >
>
> > >
>
> > > ...
>
> > >
>
> > >
>
> > >
>
> > > ssc.start();
>
> > >
>
> > > ssc.awaitTermination();
>
> > >
>
> > > ssc.close();
>
> > >
>
> > > }
>
> > >
>
> > >
>
> > >
>
> > > Is there something specific I should be doing to let the Flume server
> know
>
> > > the batch has been received and processed?
>
> > >
>
> > >
>
> > > --
>
> > >
>
> > > Ian Brooks
>
>
>
>
> --
>

Re: Flume integration

2016-11-20 Thread Ian Brooks
Hi Mich,

Yes, i managed to resolve this one. The issue was because the way described in 
the docs 
doesn't work properly as in order for the Flume part to be notified you need to 
set the 
storageLevel on the PollingStream like

JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, addresses, 
StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 10);


After setting this, the data is correclty maked as processed by the SPARK 
reveiver and the 
Flume sink is notified.

-Ian


> Hi Ian,
> 
> Has this been resolved?
> 
> How about data to Flume and then Kafka and Kafka streaming into Spark?
> 
> Thanks
> 
> Dr Mich Talebzadeh
> 
> 
> 
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABU
> rV8Pw
>  UrV8Pw>*
> 
> 
> 
> http://talebzadehmich.wordpress.com
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> 

Re: Flume integration

2016-11-20 Thread Mich Talebzadeh
Hi Ian,

Has this been resolved?

How about data to Flume and then Kafka and Kafka streaming into Spark?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 July 2016 at 11:13, Ian Brooks  wrote:

> Hi,
>
>
>
> I'm currently trying to implement a prototype Spark application that gets
> data from Flume and processes it. I'm using the pull based method mentioned
> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>
>
>
> The is initially working fine for getting data from Flume, however the
> Spark client doesn't appear to be letting Flume know that the data has been
> received, so Flume doesn't remove it from the batch.
>
>
>
> After 100 requests Flume stops allowing any new data and logs
>
>
>
> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
> Error while processing transaction.
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
> MemoryChannel.java:96)
>
>
>
> My code to pull the data from Flume is
>
>
>
> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> Duration batchInterval = new Duration(1);
>
> final String checkpointDir = "/tmp/";
>
>
>
> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
> ssc.checkpoint(checkpointDir);
>
> JavaReceiverInputDStream flumeStream = 
> FlumeUtils.createPollingStream(ssc,
> host, port);
>
>
>
> // Transform each flume avro event to a process-able format
>
> JavaDStream transformedEvents = flumeStream.map(new
> Function() {
>
>
>
> @Override
>
> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> String flumeEventStr = flumeEvent.event().toString();
>
> avroData avroData = new avroData();
>
> Gson gson = new GsonBuilder().create();
>
> avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> HashMap body = avroData.getBody();
>
> String data = body.get("bytes");
>
> return data;
>
> }
>
> });
>
>
>
> ...
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> ssc.close();
>
> }
>
>
>
> Is there something specific I should be doing to let the Flume server know
> the batch has been received and processed?
>
>
> --
>
> Ian Brooks
>
>
>


Flume integration

2016-07-13 Thread Ian Brooks
Hi,

I'm currently trying to implement a prototype Spark application that gets data 
from Flume and processes it. I'm using the pull based method mentioned in 
https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html 

The is initially working fine for getting data from Flume, however the Spark 
client doesn't appear to be letting Flume know that the data has been received, 
so Flume doesn't remove it from the batch. 

After 100 requests Flume stops allowing any new data and logs

08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] 
(org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error 
while processing transaction. 


My code to pull the data from Flume is

SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(1);

final String checkpointDir = "/tmp/";

final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
ssc.checkpoint(checkpointDir);

JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, host, port);


// Transform each flume avro event to a process-able format
JavaDStream transformedEvents = flumeStream.map(new 
Function() {

@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception 
{
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();

Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class);
 
HashMap body = avroData.getBody();
String data = body.get("bytes");
 
return data;
}
});


...

ssc.start();
ssc.awaitTermination();
ssc.close();
}


Is there something specific I should be doing to let the Flume server know the 
batch has been received and processed?


*Ian Brooks*



Re: query on Spark + Flume integration using push model

2015-07-10 Thread diplomatic Guru
Hi Akhil, thank you for your reply. Does that mean that original Spark
Streaming only support Avro? If that the case then why only Avro? Is there
a particular reason?

The project linked is for Scala but I'm using Java. Is there another
project?


On 10 July 2015 at 08:46, Akhil Das  wrote:

> Here's an example https://github.com/przemek1990/spark-streaming
>
> Thanks
> Best Regards
>
> On Thu, Jul 9, 2015 at 4:35 PM, diplomatic Guru 
> wrote:
>
>> Hello all,
>>
>> I'm trying to configure the flume to push data into a sink so that my
>> stream job could pick up the data. My events are in JSON format, but the
>> "Spark + Flume integration" [1] document only refer to Avro sink.
>>
>> [1] https://spark.apache.org/docs/latest/streaming-flume-integration.html
>>
>> I looked at some of the examples online, and they all refer to avro type:
>>
>> agent.sinks.avroSink.type = avro
>>
>> If I set the type to avro and send the data in JSON, will it work? I'm
>> unable to try this because the Stream job throwing Avro
>> 'org.apache.flume.source.avro.AvroFlumeEvent' exception.
>>
>>
>> Please advice how to handle this situation.
>>
>>
>> many thanks
>>
>
>


Re: query on Spark + Flume integration using push model

2015-07-10 Thread Akhil Das
Here's an example https://github.com/przemek1990/spark-streaming

Thanks
Best Regards

On Thu, Jul 9, 2015 at 4:35 PM, diplomatic Guru 
wrote:

> Hello all,
>
> I'm trying to configure the flume to push data into a sink so that my
> stream job could pick up the data. My events are in JSON format, but the
> "Spark + Flume integration" [1] document only refer to Avro sink.
>
> [1] https://spark.apache.org/docs/latest/streaming-flume-integration.html
>
> I looked at some of the examples online, and they all refer to avro type:
>
> agent.sinks.avroSink.type = avro
>
> If I set the type to avro and send the data in JSON, will it work? I'm
> unable to try this because the Stream job throwing Avro
> 'org.apache.flume.source.avro.AvroFlumeEvent' exception.
>
>
> Please advice how to handle this situation.
>
>
> many thanks
>


query on Spark + Flume integration using push model

2015-07-09 Thread diplomatic Guru
Hello all,

I'm trying to configure the flume to push data into a sink so that my
stream job could pick up the data. My events are in JSON format, but the
"Spark + Flume integration" [1] document only refer to Avro sink.

[1] https://spark.apache.org/docs/latest/streaming-flume-integration.html

I looked at some of the examples online, and they all refer to avro type:

agent.sinks.avroSink.type = avro

If I set the type to avro and send the data in JSON, will it work? I'm
unable to try this because the Stream job throwing Avro
'org.apache.flume.source.avro.AvroFlumeEvent' exception.


Please advice how to handle this situation.


many thanks


Re: Spark and Flume integration - do I understand this correctly?

2014-07-29 Thread dapooley
Great, thanks guys, that helped a lot and I've got a sample working.

As a follow up, when do worker/masters become necessity?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Flume-integration-do-I-understand-this-correctly-tp10879p10908.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark and Flume integration - do I understand this correctly?

2014-07-29 Thread Hari Shreedharan

Hi,

Deploying spark with Flume is pretty simple. What you'd need to do is:

1. Start your spark Flume DStream Receiver on some machine using one of 
the FlumeUtils.createStream methods - where you need to specify the 
hostname and port of the worker node on which you want the spark 
executor to run - say a.b.c.d: 4585. This is where Spark will receive 
the data from Flume.


2. Once you application has started, start the flume agent(s) which are 
going to be sending the data, with Avro sinks with hostname set to: 
a.b.c.d and port set to 4585.


And you are done!

Tathagata Das wrote:


Hari, can you help?

TD

On Tue, Jul 29, 2014 at 12:13 PM, dapooley  wrote:


Hi,

I am trying to integrate Spark onto a Flume log sink and avro source. The
sink is on one machine (the application), and the source is on 
another. Log
events are being sent from the application server to the avro source 
server

(a log directory sink on the arvo source prints to verify)

The aim is to get Spark to also receive the same events that the avro 
source

is getting. The steps, I believe, are:

1. install/start Spark master (on avro source machine).
2. write spark application, deploy (on avro source machine).
3. add spark application as a worker to the master.
4. have spark application configured to same port as avro source

Test setup is using 2 ubuntu VMs on a Windows host.

Flume configuration:

# application ##
## Tail application log file
# /var/lib/apache-flume-1.5.0-bin/bin/flume-ng agent -n cps -c conf -f
conf/flume-conf.properties
# http://flume.apache.org/FlumeUserGuide.html#exec-source
source_agent.sources = tomcat
source_agent.sources.tomcat.type = exec
source_agent.sources.tomcat.command = tail -F
/var/lib/tomcat/logs/application.log
source_agent.sources.tomcat.batchSize = 1
source_agent.sources.tomcat.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 100

## Send to Flume Collector on Analytics Node
# http://flume.apache.org/FlumeUserGuide.html#avro-sink
source_agent.sinks = avro_sink
source_agent.sinks.avro_sink.type = avro
source_agent.sinks.avro_sink.channel = memoryChannel
source_agent.sinks.avro_sink.hostname = 10.0.2.2
source_agent.sinks.avro_sink.port = 41414


 avro source ##
## Receive Flume events for Spark streaming

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
agent1.channels = memoryChannel
agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 100

## Flume Collector on Analytics Node
# http://flume.apache.org/FlumeUserGuide.html#avro-source
agent1.sources = avroSource
agent1.sources.avroSource.type = avro
agent1.sources.avroSource.channels = memoryChannel
agent1.sources.avroSource.bind = 0.0.0.0
agent1.sources.avroSource.port = 41414

#Sinks
agent1.sinks = localout

#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
agent1.sinks.localout.type = file_roll
agent1.sinks.localout.sink.directory = /home/vagrant/flume/logs
agent1.sinks.localout.sink.rollInterval = 0
agent1.sinks.localout.channel = memoryChannel

thank you in advance for any assistance,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Flume-integration-do-I-understand-this-correctly-tp10879.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




Tathagata Das <mailto:tathagata.das1...@gmail.com>
July 29, 2014 at 1:52 PM
Hari, can you help?

TD


--

Thanks,
Hari


Re: Spark and Flume integration - do I understand this correctly?

2014-07-29 Thread Tathagata Das
Hari, can you help?

TD

On Tue, Jul 29, 2014 at 12:13 PM, dapooley  wrote:
> Hi,
>
> I am trying to integrate Spark onto a Flume log sink and avro source. The
> sink is on one machine (the application), and the source is on another. Log
> events are being sent from the application server to the avro source server
> (a log directory sink on the arvo source prints to verify)
>
> The aim is to get Spark to also receive the same events that the avro source
> is getting. The steps, I believe, are:
>
> 1. install/start Spark master (on avro source machine).
> 2. write spark application, deploy (on avro source machine).
> 3. add spark application as a worker to the master.
> 4. have spark application configured to same port as avro source
>
> Test setup is using 2 ubuntu VMs on a Windows host.
>
> Flume configuration:
>
> # application ##
> ## Tail application log file
> # /var/lib/apache-flume-1.5.0-bin/bin/flume-ng agent -n cps -c conf -f
> conf/flume-conf.properties
> # http://flume.apache.org/FlumeUserGuide.html#exec-source
> source_agent.sources = tomcat
> source_agent.sources.tomcat.type = exec
> source_agent.sources.tomcat.command = tail -F
> /var/lib/tomcat/logs/application.log
> source_agent.sources.tomcat.batchSize = 1
> source_agent.sources.tomcat.channels = memoryChannel
>
> # http://flume.apache.org/FlumeUserGuide.html#memory-channel
> source_agent.channels = memoryChannel
> source_agent.channels.memoryChannel.type = memory
> source_agent.channels.memoryChannel.capacity = 100
>
> ## Send to Flume Collector on Analytics Node
> # http://flume.apache.org/FlumeUserGuide.html#avro-sink
> source_agent.sinks = avro_sink
> source_agent.sinks.avro_sink.type = avro
> source_agent.sinks.avro_sink.channel = memoryChannel
> source_agent.sinks.avro_sink.hostname = 10.0.2.2
> source_agent.sinks.avro_sink.port = 41414
>
>
>  avro source ##
> ## Receive Flume events for Spark streaming
>
> # http://flume.apache.org/FlumeUserGuide.html#memory-channel
> agent1.channels = memoryChannel
> agent1.channels.memoryChannel.type = memory
> agent1.channels.memoryChannel.capacity = 100
>
> ## Flume Collector on Analytics Node
> # http://flume.apache.org/FlumeUserGuide.html#avro-source
> agent1.sources = avroSource
> agent1.sources.avroSource.type = avro
> agent1.sources.avroSource.channels = memoryChannel
> agent1.sources.avroSource.bind = 0.0.0.0
> agent1.sources.avroSource.port = 41414
>
> #Sinks
> agent1.sinks = localout
>
> #http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
> agent1.sinks.localout.type = file_roll
> agent1.sinks.localout.sink.directory = /home/vagrant/flume/logs
> agent1.sinks.localout.sink.rollInterval = 0
> agent1.sinks.localout.channel = memoryChannel
>
> thank you in advance for any assistance,
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Flume-integration-do-I-understand-this-correctly-tp10879.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark and Flume integration - do I understand this correctly?

2014-07-29 Thread dapooley
Hi,

I am trying to integrate Spark onto a Flume log sink and avro source. The
sink is on one machine (the application), and the source is on another. Log
events are being sent from the application server to the avro source server
(a log directory sink on the arvo source prints to verify)

The aim is to get Spark to also receive the same events that the avro source
is getting. The steps, I believe, are:

1. install/start Spark master (on avro source machine).
2. write spark application, deploy (on avro source machine).
3. add spark application as a worker to the master.
4. have spark application configured to same port as avro source

Test setup is using 2 ubuntu VMs on a Windows host.

Flume configuration:

# application ##
## Tail application log file
# /var/lib/apache-flume-1.5.0-bin/bin/flume-ng agent -n cps -c conf -f
conf/flume-conf.properties
# http://flume.apache.org/FlumeUserGuide.html#exec-source
source_agent.sources = tomcat
source_agent.sources.tomcat.type = exec
source_agent.sources.tomcat.command = tail -F
/var/lib/tomcat/logs/application.log
source_agent.sources.tomcat.batchSize = 1
source_agent.sources.tomcat.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 100

## Send to Flume Collector on Analytics Node
# http://flume.apache.org/FlumeUserGuide.html#avro-sink
source_agent.sinks = avro_sink
source_agent.sinks.avro_sink.type = avro
source_agent.sinks.avro_sink.channel = memoryChannel
source_agent.sinks.avro_sink.hostname = 10.0.2.2
source_agent.sinks.avro_sink.port = 41414


 avro source ##
## Receive Flume events for Spark streaming

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
agent1.channels = memoryChannel
agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 100

## Flume Collector on Analytics Node
# http://flume.apache.org/FlumeUserGuide.html#avro-source
agent1.sources = avroSource
agent1.sources.avroSource.type = avro
agent1.sources.avroSource.channels = memoryChannel
agent1.sources.avroSource.bind = 0.0.0.0
agent1.sources.avroSource.port = 41414

#Sinks
agent1.sinks = localout

#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
agent1.sinks.localout.type = file_roll
agent1.sinks.localout.sink.directory = /home/vagrant/flume/logs
agent1.sinks.localout.sink.rollInterval = 0
agent1.sinks.localout.channel = memoryChannel

thank you in advance for any assistance,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Flume-integration-do-I-understand-this-correctly-tp10879.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.