Hi Johan,
Thank you for the quick response! Indeed, in my application I do create the
DatabasePublisher anew for each request. This is a mistake I introduced
when I stripped down the code for this forum post. Sorry about this.
The problem I have is that even when the http connection is closed, the
database query still continues streaming. I would assume it should stop
since there is no downstream demand anymore when the connection closes... I
am not sure whether this is a problem related to slick or if my streaming
setup is flawed.
Mark
On Thursday, February 2, 2017 at 12:18:38 AM UTC+1, Akka Team wrote:
>
> From the slick docs:
>
> Execution of the DBIOAction does not start until a Subscriber is attached
> to the stream. Only a singleSubscriber is supported, and any further
> attempts to subscribe again will fail ...
>
>
> I'm guessing what you actually want is not to create your publisher once
> upon application start and subscribe to that multiple time like you do now
> but rather once for every request that comes in to webservice/csv (so
> inside the block of the get directive.
>
> --
> Johan
> Akka Team
>
> On Wed, Feb 1, 2017 at 3:27 PM, Mark Goldenstein <bigan...@gmail.com
> > wrote:
>
>> Hi guys!
>>
>> (Cross-posting this since I did not get a response in the Slick group.)
>>
>> I use akka-http together with Slick to stream rows from a mysql database
>> as csv via http.
>>
>> Streaming works as expected unless I disconnect the http connection
>> prematurely. In this case I would expect that the sql query should stop
>> executing since there is no downstream demand anymore. However, I can see
>> in the mysql processes that the query is still running and it only stops
>> when I shut down the webserver (or when the query finishes fetching all the
>> rows).
>>
>> If I turn on debug logs I see repeatedly, while streaming:
>>
>> 22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending
>> streaming action with continuation (more data available)
>> 22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG
>> slick.basic.BasicBackend.stream - Scheduling stream continuation after
>> transition from demand = 0
>> 22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting
>> streaming action, realDemand = 8
>>
>> Then, once I disconnect the connection, and there is no downstream demand
>> anymore, I see:
>>
>> 22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending
>> streaming action with continuation (more data available)
>> 22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG
>> akka.io.TcpIncomingConnection - Closing connection due to IO error
>> java.io.IOException: Broken pipe
>> 22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG
>> slick.basic.BasicBackend.stream - Scheduling stream continuation after
>> transition from demand = 0
>> 22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting
>> streaming action, realDemand = oo
>>
>> What is going on here? Why is realDemand oo?
>>
>> My code looks something like this:
>>
>> object Main extends App {
>> implicit val system = ActorSystem()
>> implicit val executor = system.dispatcher
>> implicit val materializer = ActorMaterializer()
>>
>> implicit val csvMarshaller =
>> Marshaller.withFixedContentType[CSVLine,
>> ByteString](ContentTypes.`text/csv(UTF-8)`) {
>> case CSVLine(line) => ByteString(line)
>> }
>>
>> implicit val csvStreamingSupport = EntityStreamingSupport.csv()
>> .withParallelMarshalling(parallelism = 8, unordered = false)
>>
>> val query = ??? // a Slick query
>>
>> val publisher: DatabasePublisher[Tick] = DB.get.stream(
>> query.result.withStatementParameters(statementInit = DB.enableStream))
>>
>> val routes = {
>> logRequestResult("webservice") {
>> encodeResponse {
>> pathPrefix("csv") {
>> pathSingleSlash {
>> get {
>> complete {
>> Source.fromPublisher(publisher).map(t =>
>> CSVLine(t.toCSV()))
>> }
>> }
>> }
>> }
>> }
>> }
>> }
>>
>> Http().bindAndHandle(routes, "127.0.0.1", 9000)
>> }
>>
>> object DB {
>> private val db = Database.forConfig("db")
>>
>> def get = db
>>
>> def enableStream(statement: java.sql.Stateme