Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/127#discussion_r111409181
  
    --- Diff: 
extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
 ---
    @@ -202,14 +203,56 @@ public String  withRyaIntegration(final String pcjId, 
final PrecomputedJoinStora
                                triplesBatch.clear();
                        }
     
    +=======
    +           try (Transaction tx = fluo.newTransaction()) {
    +                   // Write the query's structure to Fluo.
    +                   new FluoQueryMetadataDAO().write(tx, fluoQuery);
    +
    +                   // The results of the query are eventually exported to 
an instance
    +                   // of Rya, so store the Rya ID for the PCJ.
    +                   final String queryId = 
fluoQuery.getQueryMetadata().getNodeId();
    +                   tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
    +                   tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, 
queryId);
    +
    +                   // Flush the changes to Fluo.
    +                   tx.commit();
    +           }
    +
    +           // Reuse the same set object while performing batch inserts.
    +           final Set<RyaStatement> queryBatch = new HashSet<>();
    +
    +           // Iterate through each of the statement patterns and insert 
their
    +           // historic matches into Fluo.
    +           for (final StatementPatternMetadata patternMetadata : 
fluoQuery.getStatementPatternMetadata()) {
    +                   // Get an iterator over all of the binding sets that 
match the
    +                   // statement pattern.
    +                   final StatementPattern pattern = FluoStringConverter
    +                                   
.toStatementPattern(patternMetadata.getStatementPattern());
    +                   queryBatch.add(spToRyaStatement(pattern));
    +           }
    +
    +           Iterator<RyaStatement> triples = queryEngine.query(new 
BatchRyaQuery(queryBatch)).iterator();
    +           Set<RyaStatement> triplesBatch = new HashSet<>();
    +
    +           // Insert batches of the binding sets into Fluo.
    +           while (triples.hasNext()) {
    +                   if (triplesBatch.size() == spInsertBatchSize) {
    +                           writeBatch(fluo, triplesBatch);
    +                           triplesBatch.clear();
    +                   }
    +
    +>>>>>>> RYA-222-Fixed Column Visibility Bug for Results Streamed into Fluo
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to