Also the hashJoin is going to read the entire entity table into memory. If
that's a large index that could be using lots of memory.

25 million docs should be ok to /export from one node, as long as you have
enough memory to load the docValues for the fields for sorting and
exporting.

Breaking down the query into it's parts will show where the issue is. Also
adding more heap might give you enough memory.

In my testing the max docs per second I've seen the /export handler push
from a single node is 650,000. In order to get 650,000 docs per second on
one node you have to partition the stream with workers. In my testing it
took 8 workers hitting one node to achieve the 650,000 docs per second.

But the numbers get big as the cluster grows. With 20 shards and 4 replicas
and 32 workers, you could export 52,000,000 docs per-second. With 40
shards, 5 replicas and 40 workers you could export 130,000,000 docs per
second.

So with large clusters you could do very large distributed joins with
sub-second performance.




Joel Bernstein
http://joelsolr.blogspot.com/

On Fri, May 13, 2016 at 8:11 PM, Ryan Cutter <ryancut...@gmail.com> wrote:

> Thanks very much for the advice.  Yes, I'm running in a very basic single
> shard environment.  I thought that 25M docs was small enough to not require
> anything special but I will try scaling like you suggest and let you know
> what happens.
>
> Cheers, Ryan
>
> On Fri, May 13, 2016 at 4:53 PM, Joel Bernstein <joels...@gmail.com>
> wrote:
>
> > I would try breaking down the second query to see when the problems
> occur.
> >
> > 1) Start with just a single *:* search from one of the collections.
> > 2) Then test the innerJoin. The innerJoin won't take much memory as it's
> a
> > streaming merge join.
> > 3) Then try the full thing.
> >
> > If you're running a large join like this all on one host then you might
> not
> > have enough memory for the docValues and the two joins. In general
> > streaming is designed to scale by adding servers. It scales 3 ways:
> >
> > 1) Adding shards, splits up the index for more pushing power.
> > 2) Adding workers, partitions the streams and splits up the join / merge
> > work.
> > 3) Adding replicas, when you have workers you will add pushing power by
> > adding replicas. This is because workers will fetch partitions of the
> > streams from across the entire cluster. So ALL replicas will be pushing
> at
> > once.
> >
> > So, imagine a setup with 20 shards, 4 replicas, and 20 workers. You can
> > perform massive joins quickly.
> >
> > But for you're scenario and available hardware you can experiment with
> > different cluster sizes.
> >
> >
> >
> > Joel Bernstein
> > http://joelsolr.blogspot.com/
> >
> > On Fri, May 13, 2016 at 7:27 PM, Ryan Cutter <ryancut...@gmail.com>
> wrote:
> >
> > > qt="/export" immediately fixed the query in Question #1.  Sorry for
> > missing
> > > that in the docs!
> > >
> > > The second query (with /export) crashes the server so I was going to
> look
> > > at parallelization if you think that's a good idea.  It also seems
> unwise
> > > to joining into 26M docs so maybe I can reconfigure the query to run
> > along
> > > a more happy path :-)  The schema is very RDBMS-centric so maybe that
> > just
> > > won't ever work in this framework.
> > >
> > > Here's the log but it's not very helpful.
> > >
> > >
> > > INFO  - 2016-05-13 23:18:13.214; [c:triple s:shard1 r:core_node1
> > > x:triple_shard1_replica1] org.apache.solr.core.SolrCore;
> > > [triple_shard1_replica1]  webapp=/solr path=/export
> > >
> > >
> >
> params={q=*:*&distrib=false&fl=triple_id,subject_id,type_id&sort=type_id+asc&wt=json&version=2.2}
> > > hits=26305619 status=0 QTime=61
> > >
> > > INFO  - 2016-05-13 23:18:13.747; [c:triple_type s:shard1 r:core_node1
> > > x:triple_type_shard1_replica1] org.apache.solr.core.SolrCore;
> > > [triple_type_shard1_replica1]  webapp=/solr path=/export
> > >
> > >
> >
> params={q=*:*&distrib=false&fl=triple_type_id,triple_type_label&sort=triple_type_id+asc&wt=json&version=2.2}
> > > hits=702 status=0 QTime=2
> > >
> > > INFO  - 2016-05-13 23:18:48.504; [   ]
> > > org.apache.solr.common.cloud.ConnectionManager; Watcher
> > > org.apache.solr.common.cloud.ConnectionManager@6ad0f304
> > > name:ZooKeeperConnection Watcher:localhost:9983 got event WatchedEvent
> > > state:Disconnected type:None path:null path:null type:None
> > >
> > > INFO  - 2016-05-13 23:18:48.504; [   ]
> > > org.apache.solr.common.cloud.ConnectionManager; zkClient has
> disconnected
> > >
> > > ERROR - 2016-05-13 23:18:51.316; [c:triple s:shard1 r:core_node1
> > > x:triple_shard1_replica1] org.apache.solr.common.SolrException;
> > null:Early
> > > Client Disconnect
> > >
> > > WARN  - 2016-05-13 23:18:51.431; [   ]
> > > org.apache.zookeeper.ClientCnxn$SendThread; Session 0x154ac66c81e0002
> for
> > > server localhost/0:0:0:0:0:0:0:1:9983, unexpected error, closing socket
> > > connection and attempting reconnect
> > >
> > > java.io.IOException: Connection reset by peer
> > >
> > >         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > >
> > >         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > >
> > >         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > >
> > >         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> > >
> > >         at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> > >
> > >         at
> > >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
> > >
> > >         at
> > >
> > >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
> > >
> > >         at
> > > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > >
> > > On Fri, May 13, 2016 at 3:09 PM, Joel Bernstein <joels...@gmail.com>
> > > wrote:
> > >
> > > > A couple of other things:
> > > >
> > > > 1) Your innerJoin can parallelized across workers to improve
> > performance.
> > > > Take a look at the docs on the parallel function for the details.
> > > >
> > > > 2) It looks like you might be doing graph operations with joins. You
> > > might
> > > > to take a look at the gatherNodes function coming in 6.1:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62693238
> > > >
> > > > Joel Bernstein
> > > > http://joelsolr.blogspot.com/
> > > >
> > > > On Fri, May 13, 2016 at 5:57 PM, Joel Bernstein <joels...@gmail.com>
> > > > wrote:
> > > >
> > > > > When doing things that require all the results (like joins) you
> need
> > to
> > > > > specify the /export handler in the search function.
> > > > >
> > > > > qt="/export"
> > > > >
> > > > > The search function defaults to the /select handler which is
> designed
> > > to
> > > > > return the top N results. The /export handler always returns all
> > > results
> > > > > that match the query. Also keep in mind that the /export handler
> > > requires
> > > > > that sort fields and fl fields have docValues set.
> > > > >
> > > > > Joel Bernstein
> > > > > http://joelsolr.blogspot.com/
> > > > >
> > > > > On Fri, May 13, 2016 at 5:36 PM, Ryan Cutter <ryancut...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > >> Question #1:
> > > > >>
> > > > >> triple_type collection has a few hundred docs and triple has 25M
> > docs.
> > > > >>
> > > > >> When I search for a particular subject_id in triple which I know
> has
> > > 14
> > > > >> results and do not pass in 'rows' params, it returns 0 results:
> > > > >>
> > > > >> innerJoin(
> > > > >>     search(triple, q=subject_id:1656521,
> > > > >> fl="triple_id,subject_id,type_id",
> > > > >> sort="type_id asc"),
> > > > >>     search(triple_type, q=*:*,
> > fl="triple_type_id,triple_type_label",
> > > > >> sort="triple_type_id asc"),
> > > > >>     on="type_id=triple_type_id"
> > > > >> )
> > > > >>
> > > > >> When I do the same search with rows=10000, it returns 14 results:
> > > > >>
> > > > >> innerJoin(
> > > > >>     search(triple, q=subject_id:1656521,
> > > > >> fl="triple_id,subject_id,type_id",
> > > > >> sort="type_id asc", rows=10000),
> > > > >>     search(triple_type, q=*:*,
> > fl="triple_type_id,triple_type_label",
> > > > >> sort="triple_type_id asc", rows=10000),
> > > > >>     on="type_id=triple_type_id"
> > > > >> )
> > > > >>
> > > > >> Am I doing this right?  Is there a magic number to pass into rows
> > > which
> > > > >> says "give me all the results which match this query"?
> > > > >>
> > > > >>
> > > > >> Question #2:
> > > > >>
> > > > >> Perhaps related to the first question but I want to run the
> > > innerJoin()
> > > > >> without the subject_id - rather have it use the results of another
> > > > query.
> > > > >> But this does not return any results.  I'm saying "search for this
> > > > entity
> > > > >> based on id then use that result's entity_id as the subject_id to
> > look
> > > > >> through the triple/triple_type collections:
> > > > >>
> > > > >> hashJoin(
> > > > >>     innerJoin(
> > > > >>         search(triple, q=*:*, fl="triple_id,subject_id,type_id",
> > > > >> sort="type_id asc"),
> > > > >>         search(triple_type, q=*:*,
> > > > fl="triple_type_id,triple_type_label",
> > > > >> sort="triple_type_id asc"),
> > > > >>         on="type_id=triple_type_id"
> > > > >>     ),
> > > > >>     hashed=search(entity,
> > > > >> q=id:"urn:sid:entity:455dfa1aa27eedad21ac2115797c1580bb3b3b4e",
> > > > >> fl="entity_id,entity_label", sort="entity_id asc"),
> > > > >>     on="subject_id=entity_id"
> > > > >> )
> > > > >>
> > > > >> Am I using doing this hashJoin right?
> > > > >>
> > > > >> Thanks very much, Ryan
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to