Re: Streaming a large file onto a channel
Hi Adrian, What is exactly the issue that you're facing? I did my own version and it seems to be working fine. Please, take a look and I hope it helps. (defn process-file [ch file] (async/thread (with-open [input (io/reader file)] (doseq [line (line-seq input)] (async/!! ch line) (defn parse [line] (str Parsed: line)) ; change it to do whatever you want (defn mapping [ch] (async/map parse [ch])) (defn start [] (let [events (mapping (async/chan))] (process-file events 10_events.json) (async/go-loop [] (let [v (async/! events)] (println v) (recur) About your approach. For me, it seems a legitimate usage for core.async. Please, send us your impressions once you finish. Cheers, Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu: Hi, I've played around with core.async a bit but now I'm trying to use it for a real project and I'm running into a problem getting data off a file and into a channel on the JVM (i.e. as opposed to ClojureScript) I have around 1GB of data sitting in a file. Each line of the file contains a separate JSON document. There are different types of document in the file and I would like use core.async to setup a pipeline of concurrent operations as follows so I can start processing the data before I've finished reading the file. 1. Stream the raw data out of the file one line at a time, parse it as JSON and write each line to channel (1) 2. Read channel (1) and divide the messages up by type and write them to new channels (2..n) 3. Read channels (2..n) and apply business logic as appropriate I'd like the initial read to run in it's own thread because it will be IO blocking. The others can run in core.async's thread pool I'm running into problems getting channels (1) and (2) to talk to one another. Here's my initial spike and I would expect it to write the 10 lines of json from the example file to stdout. (defn file-to-chan [ch file] (do (async/thread (with-open [rdr (io/reader file)] (doseq [line (line-seq rdr)] (!! ch line ch)) (defn parse-line [s] (json/parse-string s (comp keyword str/lower-case))) (def events (chan 1 (map parse-line))) (go (while true (println (! events (file-to-chan events 10_events.json) I have a few questions... * Can anyone help me understand what's going wrong? (I'm sure it's something silly, but I'm going cross eyed looking at it) * It's effectively a batch process. Is this an appropriate use case for core.async? * If so, am I on the right track or is there a better way to approach this? Many Thanks Adrian -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Streaming a large file onto a channel
Hi Erick Thanks for getting back to me. On my system, I wasn't seeing the contents of my file being listed in the REPL. Your code is working fine though and I can't see anything significantly different so I wonder if I had managed to corrupt my session in some way. Anyway, it's good to know I'm on the right path. I'll post my solutions as I get things up and running Cheers Adrian On Wednesday, 18 March 2015 13:45:33 UTC, Erick Pintor wrote: Hi Adrian, What is exactly the issue that you're facing? I did my own version and it seems to be working fine. Please, take a look and I hope it helps. (defn process-file [ch file] (async/thread (with-open [input (io/reader file)] (doseq [line (line-seq input)] (async/!! ch line) (defn parse [line] (str Parsed: line)) ; change it to do whatever you want (defn mapping [ch] (async/map parse [ch])) (defn start [] (let [events (mapping (async/chan))] (process-file events 10_events.json) (async/go-loop [] (let [v (async/! events)] (println v) (recur) About your approach. For me, it seems a legitimate usage for core.async. Please, send us your impressions once you finish. Cheers, Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu: Hi, I've played around with core.async a bit but now I'm trying to use it for a real project and I'm running into a problem getting data off a file and into a channel on the JVM (i.e. as opposed to ClojureScript) I have around 1GB of data sitting in a file. Each line of the file contains a separate JSON document. There are different types of document in the file and I would like use core.async to setup a pipeline of concurrent operations as follows so I can start processing the data before I've finished reading the file. 1. Stream the raw data out of the file one line at a time, parse it as JSON and write each line to channel (1) 2. Read channel (1) and divide the messages up by type and write them to new channels (2..n) 3. Read channels (2..n) and apply business logic as appropriate I'd like the initial read to run in it's own thread because it will be IO blocking. The others can run in core.async's thread pool I'm running into problems getting channels (1) and (2) to talk to one another. Here's my initial spike and I would expect it to write the 10 lines of json from the example file to stdout. (defn file-to-chan [ch file] (do (async/thread (with-open [rdr (io/reader file)] (doseq [line (line-seq rdr)] (!! ch line ch)) (defn parse-line [s] (json/parse-string s (comp keyword str/lower-case))) (def events (chan 1 (map parse-line))) (go (while true (println (! events (file-to-chan events 10_events.json) I have a few questions... * Can anyone help me understand what's going wrong? (I'm sure it's something silly, but I'm going cross eyed looking at it) * It's effectively a batch process. Is this an appropriate use case for core.async? * If so, am I on the right track or is there a better way to approach this? Many Thanks Adrian -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Streaming a large file onto a channel
It's possible you are simply not seeing the println output from a background thread, depending on how your repl etc is set up. On Wed, 18 Mar 2015 3:19 pm Adrian Mowat adrian.mo...@gmail.com wrote: Hi Erick Thanks for getting back to me. On my system, I wasn't seeing the contents of my file being listed in the REPL. Your code is working fine though and I can't see anything significantly different so I wonder if I had managed to corrupt my session in some way. Anyway, it's good to know I'm on the right path. I'll post my solutions as I get things up and running Cheers Adrian On Wednesday, 18 March 2015 13:45:33 UTC, Erick Pintor wrote: Hi Adrian, What is exactly the issue that you're facing? I did my own version and it seems to be working fine. Please, take a look and I hope it helps. (defn process-file [ch file] (async/thread (with-open [input (io/reader file)] (doseq [line (line-seq input)] (async/!! ch line) (defn parse [line] (str Parsed: line)) ; change it to do whatever you want (defn mapping [ch] (async/map parse [ch])) (defn start [] (let [events (mapping (async/chan))] (process-file events 10_events.json) (async/go-loop [] (let [v (async/! events)] (println v) (recur) About your approach. For me, it seems a legitimate usage for core.async. Please, send us your impressions once you finish. Cheers, Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu: Hi, I've played around with core.async a bit but now I'm trying to use it for a real project and I'm running into a problem getting data off a file and into a channel on the JVM (i.e. as opposed to ClojureScript) I have around 1GB of data sitting in a file. Each line of the file contains a separate JSON document. There are different types of document in the file and I would like use core.async to setup a pipeline of concurrent operations as follows so I can start processing the data before I've finished reading the file. 1. Stream the raw data out of the file one line at a time, parse it as JSON and write each line to channel (1) 2. Read channel (1) and divide the messages up by type and write them to new channels (2..n) 3. Read channels (2..n) and apply business logic as appropriate I'd like the initial read to run in it's own thread because it will be IO blocking. The others can run in core.async's thread pool I'm running into problems getting channels (1) and (2) to talk to one another. Here's my initial spike and I would expect it to write the 10 lines of json from the example file to stdout. (defn file-to-chan [ch file] (do (async/thread (with-open [rdr (io/reader file)] (doseq [line (line-seq rdr)] (!! ch line ch)) (defn parse-line [s] (json/parse-string s (comp keyword str/lower-case))) (def events (chan 1 (map parse-line))) (go (while true (println (! events (file-to-chan events 10_events.json) I have a few questions... * Can anyone help me understand what's going wrong? (I'm sure it's something silly, but I'm going cross eyed looking at it) * It's effectively a batch process. Is this an appropriate use case for core.async? * If so, am I on the right track or is there a better way to approach this? Many Thanks Adrian -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Streaming a large file onto a channel
Hi Adam I'm using the latest version on cider + cider-nrepl but it's a possibility. I suspect it's more of a case that I tried so many different combinations I polluted my repl beyond repair. My fault for not just using components from the outset :-( Thanks Adrian Sent from my iPhone On 18 Mar 2015, at 18:57, Adam Clements adam.cleme...@gmail.com wrote: It's possible you are simply not seeing the println output from a background thread, depending on how your repl etc is set up. On Wed, 18 Mar 2015 3:19 pm Adrian Mowat adrian.mo...@gmail.com wrote: Hi Erick Thanks for getting back to me. On my system, I wasn't seeing the contents of my file being listed in the REPL. Your code is working fine though and I can't see anything significantly different so I wonder if I had managed to corrupt my session in some way. Anyway, it's good to know I'm on the right path. I'll post my solutions as I get things up and running Cheers Adrian On Wednesday, 18 March 2015 13:45:33 UTC, Erick Pintor wrote: Hi Adrian, What is exactly the issue that you're facing? I did my own version and it seems to be working fine. Please, take a look and I hope it helps. (defn process-file [ch file] (async/thread (with-open [input (io/reader file)] (doseq [line (line-seq input)] (async/!! ch line) (defn parse [line] (str Parsed: line)) ; change it to do whatever you want (defn mapping [ch] (async/map parse [ch])) (defn start [] (let [events (mapping (async/chan))] (process-file events 10_events.json) (async/go-loop [] (let [v (async/! events)] (println v) (recur) About your approach. For me, it seems a legitimate usage for core.async. Please, send us your impressions once you finish. Cheers, Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu: Hi, I've played around with core.async a bit but now I'm trying to use it for a real project and I'm running into a problem getting data off a file and into a channel on the JVM (i.e. as opposed to ClojureScript) I have around 1GB of data sitting in a file. Each line of the file contains a separate JSON document. There are different types of document in the file and I would like use core.async to setup a pipeline of concurrent operations as follows so I can start processing the data before I've finished reading the file. 1. Stream the raw data out of the file one line at a time, parse it as JSON and write each line to channel (1) 2. Read channel (1) and divide the messages up by type and write them to new channels (2..n) 3. Read channels (2..n) and apply business logic as appropriate I'd like the initial read to run in it's own thread because it will be IO blocking. The others can run in core.async's thread pool I'm running into problems getting channels (1) and (2) to talk to one another. Here's my initial spike and I would expect it to write the 10 lines of json from the example file to stdout. (defn file-to-chan [ch file] (do (async/thread (with-open [rdr (io/reader file)] (doseq [line (line-seq rdr)] (!! ch line ch)) (defn parse-line [s] (json/parse-string s (comp keyword str/lower-case))) (def events (chan 1 (map parse-line))) (go (while true (println (! events (file-to-chan events 10_events.json) I have a few questions... * Can anyone help me understand what's going wrong? (I'm sure it's something silly, but I'm going cross eyed looking at it) * It's effectively a batch process. Is this an appropriate use case for core.async? * If so, am I on the right track or is there a better way to approach this? Many Thanks Adrian -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at
Re: Streaming a large file onto a channel
It seems like you're generally on the right track here (though Erick Pintor's code has some nice cleanup, like removal of necessary do, etc). The one thing I'd recommend is testing what happens with a larger channel buffer; if the file io isn't the bottleneck, but rather the processing, this could help the concurrency performance by making sure there's always something to be ready to be taken/worked on; It would be cool to see some metrics on that for your use case. Best Chris On Tuesday, March 17, 2015 at 5:52:17 AM UTC-7, Adrian Mowat wrote: Hi, I've played around with core.async a bit but now I'm trying to use it for a real project and I'm running into a problem getting data off a file and into a channel on the JVM (i.e. as opposed to ClojureScript) I have around 1GB of data sitting in a file. Each line of the file contains a separate JSON document. There are different types of document in the file and I would like use core.async to setup a pipeline of concurrent operations as follows so I can start processing the data before I've finished reading the file. 1. Stream the raw data out of the file one line at a time, parse it as JSON and write each line to channel (1) 2. Read channel (1) and divide the messages up by type and write them to new channels (2..n) 3. Read channels (2..n) and apply business logic as appropriate I'd like the initial read to run in it's own thread because it will be IO blocking. The others can run in core.async's thread pool I'm running into problems getting channels (1) and (2) to talk to one another. Here's my initial spike and I would expect it to write the 10 lines of json from the example file to stdout. (defn file-to-chan [ch file] (do (async/thread (with-open [rdr (io/reader file)] (doseq [line (line-seq rdr)] (!! ch line ch)) (defn parse-line [s] (json/parse-string s (comp keyword str/lower-case))) (def events (chan 1 (map parse-line))) (go (while true (println (! events (file-to-chan events 10_events.json) I have a few questions... * Can anyone help me understand what's going wrong? (I'm sure it's something silly, but I'm going cross eyed looking at it) * It's effectively a batch process. Is this an appropriate use case for core.async? * If so, am I on the right track or is there a better way to approach this? Many Thanks Adrian -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Streaming a large file onto a channel
Hi, I've played around with core.async a bit but now I'm trying to use it for a real project and I'm running into a problem getting data off a file and into a channel on the JVM (i.e. as opposed to ClojureScript) I have around 1GB of data sitting in a file. Each line of the file contains a separate JSON document. There are different types of document in the file and I would like use core.async to setup a pipeline of concurrent operations as follows so I can start processing the data before I've finished reading the file. 1. Stream the raw data out of the file one line at a time, parse it as JSON and write each line to channel (1) 2. Read channel (1) and divide the messages up by type and write them to new channels (2..n) 3. Read channels (2..n) and apply business logic as appropriate I'd like the initial read to run in it's own thread because it will be IO blocking. The others can run in core.async's thread pool I'm running into problems getting channels (1) and (2) to talk to one another. Here's my initial spike and I would expect it to write the 10 lines of json from the example file to stdout. (defn file-to-chan [ch file] (do (async/thread (with-open [rdr (io/reader file)] (doseq [line (line-seq rdr)] (!! ch line ch)) (defn parse-line [s] (json/parse-string s (comp keyword str/lower-case))) (def events (chan 1 (map parse-line))) (go (while true (println (! events (file-to-chan events 10_events.json) I have a few questions... * Can anyone help me understand what's going wrong? (I'm sure it's something silly, but I'm going cross eyed looking at it) * It's effectively a batch process. Is this an appropriate use case for core.async? * If so, am I on the right track or is there a better way to approach this? Many Thanks Adrian -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.