On Tue, Apr 10, 2018 at 1:26 AM, José Ribeiro <[email protected] > wrote:
> One thing, if I use the thread-safe queues, it's to use the sync or async > client? If a parallel thread is listening to the queue, I can use the sync > client, right? > Yep, that's correct. No need for the async client in that scenario. - Dan > > -José > ------------------------------ > *De:* Dan Burkert <[email protected]> > *Enviado:* 9 de abril de 2018 18:32:43 > > *Para:* [email protected] > *Assunto:* Re: AsyncKudu > > Hi José, > > I would consider doing this a little bit differently. The only > circumstances I'm aware of where it's necessary to use the async session is > when you are attempting to write to Kudu from another async context where > you can't block, and you need to handle the results of the write. That > doesn't seem to necessarily be the case here, though. Instead, I'd > consider doing the following: > > - If possible, re-use Session instances and configure > AUTO_FLUSH_BACKGROUND. This will make the throughput significantly better, > since it will internally batch up writes before flushing. With > AUTO_FLUSH_SYNC it sends an RPC for every upsert, which is going to be much > slower. Note, however, that Session is not a threadsafe class. > > - If you absolutely have to prevent blocking on your main thread, then > create a worker-thread which reads the rows to upsert from a queue and > writes them to Kudu. Have the main thread simply add the appropriate rows > to the queue. There are a number of good thread-safe queues in the Java > standard library that are appropriate. > > Doing the first, or the first and second should be easier than messing > with the async client, as well as more performant. > > - Dan > > On Mon, Apr 9, 2018 at 10:16 AM, José Ribeiro < > [email protected]> wrote: > > Thank you for the reply. But I got it to write on Kudu. > > Still, I don't know why, but I get this error: > > > "java.lang.AssertionError: Tried to resume the execution of > Deferred@2021942318(state=DONE, result=true, callback=<none>, > errback=<none>)) although it's not in state=PAUSED. This occurred after > the completion of Deferred@1753159143(state=RUNNING, result=Written with > success., callback=<none>, errback=<none>) which was originally returned by > callback=com.foo.kudu.KuduDAOImpl$1@3e69008f@1047068815" > > The only way I discover to reach kudu was like this: > > *public void asyncWrite(String tableName, Foo foo) *{ > Deferred<Deferred<Boolean>> res = new Deferred<>(); > > res.addCallback(new Callback<Deferred<Deferred<String>>, > Deferred<Boolean>>() { > @Override > *public Deferred<Deferred<String>> call(Deferred<Boolean> > booleanDeferred)*{ > return booleanDeferred.addCallbacks(new > Callback<Deferred<String>, Boolean>() { > @Override > *public Deferred<String> call(Boolean aBoolean) > throws Exception* { > if (aBoolean) { > Deferred<String> stringDeferred = > kuduClient.openTable(tableName) > .addCallbacks(new Callback<String, > KuduTable>() { > @Override > *public String call(KuduTable > kuduTable) throws KuduException* { > AsyncKuduSession session = > kuduClient.newSession(); > session.setFlushMode(SessionCo > nfiguration.FlushMode.AUTO_FLUSH_SYNC); > > Upsert upst = > kuduTable.newUpsert(); > PartialRow row = > upst.getRow(); > row.addString("house_id", > foo.getId()); > > session.apply(upst); > session.close(); > LOGGER.info("Written with > success."); > return "Written with > success."; > } > }, (Callback<Object, KuduException>) > e -> { > LOGGER.error(e.getMessage()); > return e; > }); > return stringDeferred; > } else { > throw new Exception("Table doesn't exists"); > } > > } > }, (Callback<Object, Exception>) e -> { > System.out.println(e.getMessage()); > return e; > }); > } > }); > > //executing the callback > res.callback(kuduClient.tableExists(tableName)); > } > > > A little of the background of my project. The clients read and write on > other Database, and when they write something, the same information is sent > to Kudu. I don't want to block the client with the Kudu part, because the > client only needs the other DataBase to work. > > José Ribeiro > > ------------------------------ > *De:* Dan Burkert <[email protected]> > *Enviado:* 9 de abril de 2018 17:21 > *Para:* [email protected] > *Assunto:* Re: AsyncKudu > > Hi José, > > The Deferred class is indeed pretty difficult to come to grips with, which > is why we don't really recommend the async API for most use cases. I've > personally found the Deferred > <https://github.com/OpenTSDB/async/blob/master/src/Deferred.java> class > docs to be pretty useful when getting up to speed with it. Using the > AsyncKuduSession is particularly tricky because it doesn't expose > backpressure in an intuitive way, and it becomes very easy to accidentally > have a huge number of writes buffered in the client heap (the sync client > solves this by simply blocking so more rows can't be produced). > > kudu-ts > <https://github.com/danburkert/kudu-ts/blob/master/core/src/main/java/org/kududb/ts/core/KuduTS.java> > has a little bit of code that uses the async client to open and scan > tables, so it may be helpful to look over that. It doesn't use the async > session, I think because it was easier just to use the sync client for > writing. > > - Dan > > On Mon, Apr 9, 2018 at 3:10 AM, José Ribeiro < > [email protected]> wrote: > > Hello, I'm having difficulties in the AsyncKudu API. Since there isn't any > valuable examples with asynchronous kudu client, can you please help me in > this example? > > > I want to use AsyncKudu to write in the kudu database. The steps i want to > use are: > > 1. verify that the table exists; > 2. if exists, open table; > 3. write on the table; > > > Can someone explain me how to do this with AsyncKudu? I'm having trouble > in understanding the Deferred class that is always returned in every > asyncKudu method. Thank you. > > > >
