Re: Streaming a large file onto a channel

2015-03-18 Thread Erick Pintor
Hi Adrian,

What is exactly the issue that you're facing?
I did my own version and it seems to be working fine.

Please, take a look and I hope it helps.

(defn process-file [ch file]
  (async/thread
(with-open [input (io/reader file)]
  (doseq [line (line-seq input)]
(async/!! ch line)

(defn parse [line]
  (str Parsed:  line)) ; change it to do whatever you want

(defn mapping [ch]
  (async/map parse [ch]))

(defn start []
  (let [events (mapping
 (async/chan))]
(process-file events 10_events.json)
(async/go-loop []
   (let [v (async/! events)]
 (println v)
 (recur)

About your approach. For me, it seems a legitimate usage for core.async.
Please, send us your impressions once you finish.

Cheers,


Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu:

 Hi,

 I've played around with core.async a bit but now I'm trying to use it for 
 a real project and I'm running into a problem getting data off a file and 
 into a channel on the JVM (i.e. as opposed to ClojureScript)

 I have around 1GB of data sitting in a file.  Each line of the file 
 contains a separate JSON document.  There are different types of document 
 in the file and I would like use core.async to setup a pipeline of 
 concurrent operations as follows so I can start processing the data before 
 I've finished reading the file.

 1. Stream the raw data out of the file one line at a time, parse it as 
 JSON and write each line to channel (1)
 2. Read channel (1) and divide the messages up by type and write them to 
 new channels (2..n)
 3. Read channels (2..n) and apply business logic as appropriate

 I'd like the initial read to run in it's own thread because it will be IO 
 blocking.  The others can run in core.async's thread pool 

 I'm running into problems getting channels (1) and (2) to talk to one 
 another.  Here's my initial spike and I would expect it to write the 10 
 lines of json from the example file to stdout. 

 (defn file-to-chan [ch file]
   (do
 (async/thread
   (with-open [rdr (io/reader file)]
 (doseq [line (line-seq rdr)]
   (!! ch line
 ch))

 (defn parse-line [s]
   (json/parse-string s (comp keyword str/lower-case)))

 (def events (chan 1 (map parse-line)))

 (go
   (while true
 (println (! events

 (file-to-chan events 10_events.json)

 I have a few questions...

 * Can anyone help me understand what's going wrong? (I'm sure it's 
 something silly, but I'm going cross eyed looking at it)
 * It's effectively a batch process.  Is this an appropriate use case for 
 core.async?
 * If so, am I on the right track or is there a better way to approach this?

 Many Thanks

 Adrian







-- 
You received this message because you are subscribed to the Google
Groups Clojure group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
Clojure group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Streaming a large file onto a channel

2015-03-18 Thread Adrian Mowat
Hi Erick

Thanks for getting back to me.  On my system, I wasn't seeing the contents 
of my file being listed in the REPL.  Your code is working fine though and 
I can't see anything significantly different so I wonder if I had managed 
to corrupt my session in some way.

Anyway, it's good to know I'm on the right path.  I'll post my solutions as 
I get things up and running

Cheers

Adrian



On Wednesday, 18 March 2015 13:45:33 UTC, Erick Pintor wrote:

 Hi Adrian,

 What is exactly the issue that you're facing?
 I did my own version and it seems to be working fine.

 Please, take a look and I hope it helps.

 (defn process-file [ch file]
   (async/thread
 (with-open [input (io/reader file)]
   (doseq [line (line-seq input)]
 (async/!! ch line)

 (defn parse [line]
   (str Parsed:  line)) ; change it to do whatever you want

 (defn mapping [ch]
   (async/map parse [ch]))

 (defn start []
   (let [events (mapping
  (async/chan))]
 (process-file events 10_events.json)
 (async/go-loop []
(let [v (async/! events)]
  (println v)
  (recur)

 About your approach. For me, it seems a legitimate usage for core.async.
 Please, send us your impressions once you finish.

 Cheers,


 Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu:

 Hi,

 I've played around with core.async a bit but now I'm trying to use it for 
 a real project and I'm running into a problem getting data off a file and 
 into a channel on the JVM (i.e. as opposed to ClojureScript)

 I have around 1GB of data sitting in a file.  Each line of the file 
 contains a separate JSON document.  There are different types of document 
 in the file and I would like use core.async to setup a pipeline of 
 concurrent operations as follows so I can start processing the data before 
 I've finished reading the file.

 1. Stream the raw data out of the file one line at a time, parse it as 
 JSON and write each line to channel (1)
 2. Read channel (1) and divide the messages up by type and write them to 
 new channels (2..n)
 3. Read channels (2..n) and apply business logic as appropriate

 I'd like the initial read to run in it's own thread because it will be IO 
 blocking.  The others can run in core.async's thread pool 

 I'm running into problems getting channels (1) and (2) to talk to one 
 another.  Here's my initial spike and I would expect it to write the 10 
 lines of json from the example file to stdout. 

 (defn file-to-chan [ch file]
   (do
 (async/thread
   (with-open [rdr (io/reader file)]
 (doseq [line (line-seq rdr)]
   (!! ch line
 ch))

 (defn parse-line [s]
   (json/parse-string s (comp keyword str/lower-case)))

 (def events (chan 1 (map parse-line)))

 (go
   (while true
 (println (! events

 (file-to-chan events 10_events.json)

 I have a few questions...

 * Can anyone help me understand what's going wrong? (I'm sure it's 
 something silly, but I'm going cross eyed looking at it)
 * It's effectively a batch process.  Is this an appropriate use case for 
 core.async?
 * If so, am I on the right track or is there a better way to approach 
 this?

 Many Thanks

 Adrian







-- 
You received this message because you are subscribed to the Google
Groups Clojure group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
Clojure group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Streaming a large file onto a channel

2015-03-18 Thread Adam Clements
It's possible you are simply not seeing the println output from a
background thread, depending on how your repl etc is set up.

On Wed, 18 Mar 2015 3:19 pm Adrian Mowat adrian.mo...@gmail.com wrote:

 Hi Erick

 Thanks for getting back to me.  On my system, I wasn't seeing the contents
 of my file being listed in the REPL.  Your code is working fine though and
 I can't see anything significantly different so I wonder if I had managed
 to corrupt my session in some way.

 Anyway, it's good to know I'm on the right path.  I'll post my solutions
 as I get things up and running

 Cheers

 Adrian



 On Wednesday, 18 March 2015 13:45:33 UTC, Erick Pintor wrote:

 Hi Adrian,

 What is exactly the issue that you're facing?
 I did my own version and it seems to be working fine.

 Please, take a look and I hope it helps.

 (defn process-file [ch file]
   (async/thread
 (with-open [input (io/reader file)]
   (doseq [line (line-seq input)]
 (async/!! ch line)

 (defn parse [line]
   (str Parsed:  line)) ; change it to do whatever you want

 (defn mapping [ch]
   (async/map parse [ch]))

 (defn start []
   (let [events (mapping
  (async/chan))]
 (process-file events 10_events.json)
 (async/go-loop []
(let [v (async/! events)]
  (println v)
  (recur)

 About your approach. For me, it seems a legitimate usage for core.async.
 Please, send us your impressions once you finish.

 Cheers,


 Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu:

 Hi,

 I've played around with core.async a bit but now I'm trying to use it
 for a real project and I'm running into a problem getting data off a file
 and into a channel on the JVM (i.e. as opposed to ClojureScript)

 I have around 1GB of data sitting in a file.  Each line of the file
 contains a separate JSON document.  There are different types of document
 in the file and I would like use core.async to setup a pipeline of
 concurrent operations as follows so I can start processing the data before
 I've finished reading the file.

 1. Stream the raw data out of the file one line at a time, parse it as
 JSON and write each line to channel (1)
 2. Read channel (1) and divide the messages up by type and write them to
 new channels (2..n)
 3. Read channels (2..n) and apply business logic as appropriate

 I'd like the initial read to run in it's own thread because it will be
 IO blocking.  The others can run in core.async's thread pool

 I'm running into problems getting channels (1) and (2) to talk to one
 another.  Here's my initial spike and I would expect it to write the 10
 lines of json from the example file to stdout.

 (defn file-to-chan [ch file]
   (do
 (async/thread
   (with-open [rdr (io/reader file)]
 (doseq [line (line-seq rdr)]
   (!! ch line
 ch))

 (defn parse-line [s]
   (json/parse-string s (comp keyword str/lower-case)))

 (def events (chan 1 (map parse-line)))

 (go
   (while true
 (println (! events

 (file-to-chan events 10_events.json)

 I have a few questions...

 * Can anyone help me understand what's going wrong? (I'm sure it's
 something silly, but I'm going cross eyed looking at it)
 * It's effectively a batch process.  Is this an appropriate use case for
 core.async?
 * If so, am I on the right track or is there a better way to approach
 this?

 Many Thanks

 Adrian





  --
 You received this message because you are subscribed to the Google
 Groups Clojure group.
 To post to this group, send email to clojure@googlegroups.com
 Note that posts from new members are moderated - please be patient with
 your first post.
 To unsubscribe from this group, send email to
 clojure+unsubscr...@googlegroups.com
 For more options, visit this group at
 http://groups.google.com/group/clojure?hl=en
 ---
 You received this message because you are subscribed to the Google Groups
 Clojure group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to clojure+unsubscr...@googlegroups.com.
 For more options, visit https://groups.google.com/d/optout.


-- 
You received this message because you are subscribed to the Google
Groups Clojure group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
Clojure group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Streaming a large file onto a channel

2015-03-18 Thread Adrian Mowat
Hi Adam

I'm using the latest version on cider + cider-nrepl but it's a possibility.  I 
suspect it's more of a case that I tried so many different combinations I 
polluted my repl beyond repair.  My fault for not just using components from 
the outset :-(

Thanks

Adrian 

Sent from my iPhone

 On 18 Mar 2015, at 18:57, Adam Clements adam.cleme...@gmail.com wrote:
 
 It's possible you are simply not seeing the println output from a background 
 thread, depending on how your repl etc is set up.
 
 
 On Wed, 18 Mar 2015 3:19 pm Adrian Mowat adrian.mo...@gmail.com wrote:
 Hi Erick
 
 Thanks for getting back to me.  On my system, I wasn't seeing the contents 
 of my file being listed in the REPL.  Your code is working fine though and I 
 can't see anything significantly different so I wonder if I had managed to 
 corrupt my session in some way.
 
 Anyway, it's good to know I'm on the right path.  I'll post my solutions as 
 I get things up and running
 
 Cheers
 
 Adrian
 
 
 
 On Wednesday, 18 March 2015 13:45:33 UTC, Erick Pintor wrote:
 Hi Adrian,
 
 What is exactly the issue that you're facing?
 I did my own version and it seems to be working fine.
 
 Please, take a look and I hope it helps.
 
 (defn process-file [ch file]
   (async/thread
 (with-open [input (io/reader file)]
   (doseq [line (line-seq input)]
 (async/!! ch line)
 
 (defn parse [line]
   (str Parsed:  line)) ; change it to do whatever you want
 
 (defn mapping [ch]
   (async/map parse [ch]))
 
 (defn start []
   (let [events (mapping
  (async/chan))]
 (process-file events 10_events.json)
 (async/go-loop []
(let [v (async/! events)]
  (println v)
  (recur)
 
 About your approach. For me, it seems a legitimate usage for core.async.
 Please, send us your impressions once you finish.
 
 Cheers,
 
 
 Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu:
 
 Hi,
 
 I've played around with core.async a bit but now I'm trying to use it for 
 a real project and I'm running into a problem getting data off a file and 
 into a channel on the JVM (i.e. as opposed to ClojureScript)
 
 I have around 1GB of data sitting in a file.  Each line of the file 
 contains a separate JSON document.  There are different types of document 
 in the file and I would like use core.async to setup a pipeline of 
 concurrent operations as follows so I can start processing the data before 
 I've finished reading the file.
 
 1. Stream the raw data out of the file one line at a time, parse it as 
 JSON and write each line to channel (1)
 2. Read channel (1) and divide the messages up by type and write them to 
 new channels (2..n)
 3. Read channels (2..n) and apply business logic as appropriate
 
 I'd like the initial read to run in it's own thread because it will be IO 
 blocking.  The others can run in core.async's thread pool 
 
 I'm running into problems getting channels (1) and (2) to talk to one 
 another.  Here's my initial spike and I would expect it to write the 10 
 lines of json from the example file to stdout. 
 
 (defn file-to-chan [ch file]
   (do
 (async/thread
   (with-open [rdr (io/reader file)]
 (doseq [line (line-seq rdr)]
   (!! ch line
 ch))
 
 (defn parse-line [s]
   (json/parse-string s (comp keyword str/lower-case)))
 
 (def events (chan 1 (map parse-line)))
 
 (go
   (while true
 (println (! events
 
 (file-to-chan events 10_events.json)
 
 I have a few questions...
 
 * Can anyone help me understand what's going wrong? (I'm sure it's 
 something silly, but I'm going cross eyed looking at it)
 * It's effectively a batch process.  Is this an appropriate use case for 
 core.async?
 * If so, am I on the right track or is there a better way to approach this?
 
 Many Thanks
 
 Adrian
 
 -- 
 You received this message because you are subscribed to the Google
 Groups Clojure group.
 To post to this group, send email to clojure@googlegroups.com
 Note that posts from new members are moderated - please be patient with your 
 first post.
 To unsubscribe from this group, send email to
 clojure+unsubscr...@googlegroups.com
 For more options, visit this group at
 http://groups.google.com/group/clojure?hl=en
 --- 
 You received this message because you are subscribed to the Google Groups 
 Clojure group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to clojure+unsubscr...@googlegroups.com.
 For more options, visit https://groups.google.com/d/optout.
 
 -- 
 You received this message because you are subscribed to the Google
 Groups Clojure group.
 To post to this group, send email to clojure@googlegroups.com
 Note that posts from new members are moderated - please be patient with your 
 first post.
 To unsubscribe from this group, send email to
 clojure+unsubscr...@googlegroups.com
 For more options, visit this group at
 

Re: Streaming a large file onto a channel

2015-03-18 Thread Christopher Small
It seems like you're generally on the right track here (though Erick 
Pintor's code has some nice cleanup, like removal of necessary do, etc). 
The one thing I'd recommend is testing what happens with a larger channel 
buffer; if the file io isn't the bottleneck, but rather the processing, 
this could help the concurrency performance by making sure there's always 
something to be ready to be taken/worked on; It would be cool to see some 
metrics on that for your use case.

Best

Chris


On Tuesday, March 17, 2015 at 5:52:17 AM UTC-7, Adrian Mowat wrote:

 Hi,

 I've played around with core.async a bit but now I'm trying to use it for 
 a real project and I'm running into a problem getting data off a file and 
 into a channel on the JVM (i.e. as opposed to ClojureScript)

 I have around 1GB of data sitting in a file.  Each line of the file 
 contains a separate JSON document.  There are different types of document 
 in the file and I would like use core.async to setup a pipeline of 
 concurrent operations as follows so I can start processing the data before 
 I've finished reading the file.

 1. Stream the raw data out of the file one line at a time, parse it as 
 JSON and write each line to channel (1)
 2. Read channel (1) and divide the messages up by type and write them to 
 new channels (2..n)
 3. Read channels (2..n) and apply business logic as appropriate

 I'd like the initial read to run in it's own thread because it will be IO 
 blocking.  The others can run in core.async's thread pool 

 I'm running into problems getting channels (1) and (2) to talk to one 
 another.  Here's my initial spike and I would expect it to write the 10 
 lines of json from the example file to stdout. 

 (defn file-to-chan [ch file]
   (do
 (async/thread
   (with-open [rdr (io/reader file)]
 (doseq [line (line-seq rdr)]
   (!! ch line
 ch))

 (defn parse-line [s]
   (json/parse-string s (comp keyword str/lower-case)))

 (def events (chan 1 (map parse-line)))

 (go
   (while true
 (println (! events

 (file-to-chan events 10_events.json)

 I have a few questions...

 * Can anyone help me understand what's going wrong? (I'm sure it's 
 something silly, but I'm going cross eyed looking at it)
 * It's effectively a batch process.  Is this an appropriate use case for 
 core.async?
 * If so, am I on the right track or is there a better way to approach this?

 Many Thanks

 Adrian







-- 
You received this message because you are subscribed to the Google
Groups Clojure group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
Clojure group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Streaming a large file onto a channel

2015-03-17 Thread Adrian Mowat
Hi,

I've played around with core.async a bit but now I'm trying to use it for a 
real project and I'm running into a problem getting data off a file and 
into a channel on the JVM (i.e. as opposed to ClojureScript)

I have around 1GB of data sitting in a file.  Each line of the file 
contains a separate JSON document.  There are different types of document 
in the file and I would like use core.async to setup a pipeline of 
concurrent operations as follows so I can start processing the data before 
I've finished reading the file.

1. Stream the raw data out of the file one line at a time, parse it as JSON 
and write each line to channel (1)
2. Read channel (1) and divide the messages up by type and write them to 
new channels (2..n)
3. Read channels (2..n) and apply business logic as appropriate

I'd like the initial read to run in it's own thread because it will be IO 
blocking.  The others can run in core.async's thread pool 

I'm running into problems getting channels (1) and (2) to talk to one 
another.  Here's my initial spike and I would expect it to write the 10 
lines of json from the example file to stdout. 

(defn file-to-chan [ch file]
  (do
(async/thread
  (with-open [rdr (io/reader file)]
(doseq [line (line-seq rdr)]
  (!! ch line
ch))

(defn parse-line [s]
  (json/parse-string s (comp keyword str/lower-case)))

(def events (chan 1 (map parse-line)))

(go
  (while true
(println (! events

(file-to-chan events 10_events.json)

I have a few questions...

* Can anyone help me understand what's going wrong? (I'm sure it's 
something silly, but I'm going cross eyed looking at it)
* It's effectively a batch process.  Is this an appropriate use case for 
core.async?
* If so, am I on the right track or is there a better way to approach this?

Many Thanks

Adrian





-- 
You received this message because you are subscribed to the Google
Groups Clojure group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
Clojure group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.