Re: [akka-user] Database query does not cancel when streaming rows using a DatabasePublisher

2017-02-01 Thread Mark Goldenstein
Hi Johan,

Thank you for the quick response! Indeed, in my application I do create the 
DatabasePublisher anew for each request. This is a mistake I introduced 
when I stripped down the code for this forum post. Sorry about this.

The problem I have is that even when the http connection is closed, the 
database query still continues streaming. I would assume it should stop 
since there is no downstream demand anymore when the connection closes... I 
am not sure whether this is a problem related to slick or if my streaming 
setup is flawed.

Mark


On Thursday, February 2, 2017 at 12:18:38 AM UTC+1, Akka Team wrote:
>
> From the slick docs:
>
> Execution of the DBIOAction does not start until a Subscriber is attached 
> to the stream. Only a singleSubscriber is supported, and any further 
> attempts to subscribe again will fail ...
>
>
> I'm guessing what you actually want is not to create your publisher once 
> upon application start and subscribe to that multiple time like you do now 
> but rather once for every request that comes in to webservice/csv (so 
> inside the block of the get directive.
>
> -- 
> Johan
> Akka Team
>
> On Wed, Feb 1, 2017 at 3:27 PM, Mark Goldenstein <bigan...@gmail.com 
> > wrote:
>
>> Hi guys!
>>
>> (Cross-posting this since I did not get a response in the Slick group.)
>>
>> I use akka-http together with Slick to stream rows from a mysql database 
>> as csv via http.
>>
>> Streaming works as expected unless I disconnect the http connection 
>> prematurely. In this case I would expect that the sql query should stop 
>> executing since there is no downstream demand anymore. However, I can see 
>> in the mysql processes that the query is still running and it only stops 
>> when I shut down the webserver (or when the query finishes fetching all the 
>> rows).
>>
>> If I turn on debug logs I see repeatedly, while streaming:
>>
>> 22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending 
>> streaming action with continuation (more data available)
>> 22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG 
>> slick.basic.BasicBackend.stream - Scheduling stream continuation after 
>> transition from demand = 0
>> 22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting 
>>  streaming action, realDemand = 8
>>
>> Then, once I disconnect the connection, and there is no downstream demand 
>> anymore, I see:
>>
>> 22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending 
>> streaming action with continuation (more data available)
>> 22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG 
>> akka.io.TcpIncomingConnection - Closing connection due to IO error 
>> java.io.IOException: Broken pipe
>> 22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG 
>> slick.basic.BasicBackend.stream - Scheduling stream continuation after 
>> transition from demand = 0
>> 22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting 
>>  streaming action, realDemand = oo
>>
>> What is going on here? Why is realDemand oo?
>>
>> My code looks something like this:
>>
>> object Main extends App { 
>>   implicit val system = ActorSystem()
>>   implicit val executor = system.dispatcher
>>   implicit val materializer = ActorMaterializer()
>>
>>   implicit val csvMarshaller =
>> Marshaller.withFixedContentType[CSVLine, 
>> ByteString](ContentTypes.`text/csv(UTF-8)`) {
>>   case CSVLine(line) => ByteString(line)
>> }
>>
>>   implicit val csvStreamingSupport = EntityStreamingSupport.csv()
>> .withParallelMarshalling(parallelism = 8, unordered = false)
>>
>>   val query = ??? // a Slick query
>>
>>   val publisher: DatabasePublisher[Tick] = DB.get.stream(
>> query.result.withStatementParameters(statementInit = DB.enableStream))
>>
>>   val routes = {
>> logRequestResult("webservice") {
>>   encodeResponse {
>> pathPrefix("csv") {
>>   pathSingleSlash {
>> get {
>>   complete {
>> Source.fromPublisher(publisher).map(t => 
>> CSVLine(t.toCSV()))
>>   }
>> }
>>   }
>> }
>>   }
>> }
>>   }
>>
>>   Http().bindAndHandle(routes, "127.0.0.1", 9000)
>> }
>>
>> object DB {
>>   private val db = Database.forConfig("db")
>>
>>   def get = db
>>
>>   def enableStream(statement: java.sql.Stateme

[akka-user] Database query does not cancel when streaming rows using a DatabasePublisher

2017-02-01 Thread Mark Goldenstein
Hi guys!

(Cross-posting this since I did not get a response in the Slick group.)

I use akka-http together with Slick to stream rows from a mysql database as 
csv via http.

Streaming works as expected unless I disconnect the http connection 
prematurely. In this case I would expect that the sql query should stop 
executing since there is no downstream demand anymore. However, I can see 
in the mysql processes that the query is still running and it only stops 
when I shut down the webserver (or when the query finishes fetching all the 
rows).

If I turn on debug logs I see repeatedly, while streaming:

22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending 
streaming action with continuation (more data available)
22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG 
slick.basic.BasicBackend.stream - Scheduling stream continuation after 
transition from demand = 0
22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting 
 streaming action, realDemand = 8

Then, once I disconnect the connection, and there is no downstream demand 
anymore, I see:

22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending 
streaming action with continuation (more data available)
22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG 
akka.io.TcpIncomingConnection - Closing connection due to IO error 
java.io.IOException: Broken pipe
22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG 
slick.basic.BasicBackend.stream - Scheduling stream continuation after 
transition from demand = 0
22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting 
 streaming action, realDemand = oo

What is going on here? Why is realDemand oo?

My code looks something like this:

object Main extends App { 
  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  implicit val csvMarshaller =
Marshaller.withFixedContentType[CSVLine, 
ByteString](ContentTypes.`text/csv(UTF-8)`) {
  case CSVLine(line) => ByteString(line)
}

  implicit val csvStreamingSupport = EntityStreamingSupport.csv()
.withParallelMarshalling(parallelism = 8, unordered = false)

  val query = ??? // a Slick query

  val publisher: DatabasePublisher[Tick] = DB.get.stream(
query.result.withStatementParameters(statementInit = DB.enableStream))

  val routes = {
logRequestResult("webservice") {
  encodeResponse {
pathPrefix("csv") {
  pathSingleSlash {
get {
  complete {
Source.fromPublisher(publisher).map(t => CSVLine(t.toCSV()))
  }
}
  }
}
  }
}
  }

  Http().bindAndHandle(routes, "127.0.0.1", 9000)
}

object DB {
  private val db = Database.forConfig("db")

  def get = db

  def enableStream(statement: java.sql.Statement): Unit = {
statement match {
  case s if s.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl]) =>

s.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStreamingResults()
  case _ => // do nothing
}
  }
}

Any thoughts on this?

Mark

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.