[
https://issues.apache.org/jira/browse/PIG-114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12587169#action_12587169
]
Pi Song commented on PIG-114:
-----------------------------
Alan,
That block is basically just a caching hook-up, if the function is not
reversible then it will fall through to below which is actually compiling the
operator and use it without reading the cached output. My opinion toward the
new plan compilation is to move something like this to the optimization stage
as an optional filter.
Here is the block including its context:-
{code}
public OperatorKey compile(OperatorKey logicalKey,
Map<OperatorKey, LogicalOperator>
logicalOpTable,
HExecutionEngine execEngine) throws IOException {
// check to see if we have materialized results for the logical tree to
// compile, if so, re-use them...
//
Map<OperatorKey, MapRedResult> materializedResults =
execEngine.getMaterializedResults();
MapRedResult materializedResult = materializedResults.get(logicalKey);
if ( (materializedResult != null) &&
(PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec())
instanceof
ReversibleLoadStoreFunc) ) {
POMapreduce pom = new POMapreduce(logicalKey.getScope(),
nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
execEngine.getPhysicalOpTable(),
logicalKey,
pigContext);
pom.addInputFile(materializedResult.outFileSpec);
pom.mapParallelism = Math.max(pom.mapParallelism,
materializedResult.parallelismRequest);
return pom.getOperatorKey();
}
// first, compile inputs into MapReduce operators
OperatorKey[] compiledInputs = new
OperatorKey[logicalOpTable.get(logicalKey).getInputs().size()];
for (int i = 0; i < logicalOpTable.get(logicalKey).getInputs().size();
i++)
compiledInputs[i] =
compile(logicalOpTable.get(logicalKey).getInputs().get(i),
logicalOpTable,
execEngine);
// then, compile this operator; if possible, merge with previous
MapReduce
// operator rather than introducing a new one
LogicalOperator lo = logicalOpTable.get(logicalKey);
if (lo instanceof LOEval) {
POMapreduce pom =
((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0]))
.copy(nodeIdGenerator.getNextNodeId(logicalKey.getScope())); // make a copy of
the previous
// More and more and more plan compilation here
{code}
> store one alias/logicalPlan twice leads to instantiation of StoreFunc as
> LoadFunc
> ---------------------------------------------------------------------------------
>
> Key: PIG-114
> URL: https://issues.apache.org/jira/browse/PIG-114
> Project: Pig
> Issue Type: Bug
> Components: impl
> Affects Versions: 0.0.0
> Reporter: Johannes Zillmann
> Assignee: Pi Song
> Fix For: 0.1.0
>
> Attachments: PIG114_FixOptimize1.patch,
> PIG114_FixOptimize_Sample.patch, pigPatch-storeTwice-620665.patch
>
>
> Calling PigServer#store() twice for an alias results in following exception :
> {noformat}
> java.lang.RuntimeException: java.lang.ClassCastException:
> org.apache.pig.test.DummyStoreFunc cannot be cast to org.apache.pig.LoadFunc
> at
> org.apache.pig.backend.local.executionengine.POLoad.<init>(POLoad.java:59)
> at
> org.apache.pig.backend.local.executionengine.LocalExecutionEngine.doCompile(LocalExecutionEngine.java:167)
> at
> org.apache.pig.backend.local.executionengine.LocalExecutionEngine.doCompile(LocalExecutionEngine.java:184)
> at
> org.apache.pig.backend.local.executionengine.LocalExecutionEngine.doCompile(LocalExecutionEngine.java:184)
> at
> org.apache.pig.backend.local.executionengine.LocalExecutionEngine.compile(LocalExecutionEngine.java:111)
> at
> org.apache.pig.backend.local.executionengine.LocalExecutionEngine.compile(LocalExecutionEngine.java:90)
> at
> org.apache.pig.backend.local.executionengine.LocalExecutionEngine.compile(LocalExecutionEngine.java:1)
> at org.apache.pig.PigServer.store(PigServer.java:330)
> at org.apache.pig.PigServer.store(PigServer.java:317)
> at org.apache.pig.test.StoreTwiceTest.testIt(StoreTwiceTest.java:31)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:589)
> at junit.framework.TestCase.runTest(TestCase.java:164)
> at junit.framework.TestCase.runBare(TestCase.java:130)
> at junit.framework.TestResult$1.protect(TestResult.java:110)
> at junit.framework.TestResult.runProtected(TestResult.java:128)
> at junit.framework.TestResult.run(TestResult.java:113)
> at junit.framework.TestCase.run(TestCase.java:120)
> at junit.framework.TestSuite.runTest(TestSuite.java:228)
> at junit.framework.TestSuite.run(TestSuite.java:223)
> at
> org.junit.internal.runners.OldTestClassRunner.run(OldTestClassRunner.java:35)
> at
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:45)
> at
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)
> Caused by: java.lang.ClassCastException: org.apache.pig.test.DummyStoreFunc
> cannot be cast to org.apache.pig.LoadFunc
> at
> org.apache.pig.backend.local.executionengine.POLoad.<init>(POLoad.java:57)
> ... 28 more
> {noformat}
> I will attach a patch with a test scenario for this. Basically the code is as
> follow:
> {noformat}PigServer pig = new PigServer(ExecType.LOCAL);
> pig
> .registerQuery("A = LOAD
> 'test/org/apache/pig/test/StoreTwiceTest.java' USING "
> + DummyLoadFunc.class.getName() + "();");
> pig.registerQuery("B = FOREACH A GENERATE * ;");
> File outputFile = new File("/tmp/testPigOutput");
> outputFile.delete();
> pig.store("A", outputFile.getAbsolutePath(), DummyStoreFunc.class
> .getName()
> + "()");
> outputFile.delete();
> pig.store("B", outputFile.getAbsolutePath(), DummyStoreFunc.class
> .getName()
> + "()");
> outputFile.delete();
> assertEquals(2, _storedTuples.size());
> {noformat}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.