[
https://issues.apache.org/jira/browse/ASTERIXDB-1206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yingyi Bu resolved ASTERIXDB-1206.
----------------------------------
Resolution: Fixed
> Hyracks level job rewriting results in single-thread-per-partition for binary
> input operators
> ---------------------------------------------------------------------------------------------
>
> Key: ASTERIXDB-1206
> URL: https://issues.apache.org/jira/browse/ASTERIXDB-1206
> Project: Apache AsterixDB
> Issue Type: Bug
> Components: Hyracks Core
> Reporter: Yingyi Bu
> Assignee: Yingyi Bu
>
> The following job will be run in a single thread for one partition.
> But the two initializer -- one for each input source operator should be run
> in two parallel threads.
> @Test
> public void testScanUnion() throws Exception {
> JobSpecification spec = new JobSpecification();
> IFileSplitProvider splitProvider1 = new ConstantFileSplitProvider(new
> FileSplit[] {
> new FileSplit(NC1_ID, new FileReference(new
> File("data/words.txt"))) });
> IFileSplitProvider splitProvider2 = new ConstantFileSplitProvider(new
> FileSplit[] {
> new FileSplit(NC2_ID, new FileReference(new
> File("data/words.txt"))) });
> RecordDescriptor desc = new RecordDescriptor(
> new ISerializerDeserializer[] { new
> UTF8StringSerializerDeserializer() });
> FileScanOperatorDescriptor csvScanner1 = new
> FileScanOperatorDescriptor(
> spec,
> splitProvider1,
> new DelimitedDataTupleParserFactory(new IValueParserFactory[]
> { UTF8StringParserFactory.INSTANCE },
> ','),
> desc);
> PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
> csvScanner1, NC1_ID);
> FileScanOperatorDescriptor csvScanner2 = new
> FileScanOperatorDescriptor(
> spec,
> splitProvider2,
> new DelimitedDataTupleParserFactory(new IValueParserFactory[]
> { UTF8StringParserFactory.INSTANCE },
> ','),
> desc);
> PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
> csvScanner2, NC2_ID);
> UnionAllOperatorDescriptor union = new
> UnionAllOperatorDescriptor(spec, 2, desc);
> PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, union,
> NC1_ID);
> IConnectorDescriptor connScan1ToUnion = new
> OneToOneConnectorDescriptor(spec);
> IConnectorDescriptor connScan2ToUnion = new
> OneToOneConnectorDescriptor(spec);
> spec.connect(connScan1ToUnion, csvScanner1, 0, union, 0);
> spec.connect(connScan2ToUnion, csvScanner2, 0, union, 1);
> IOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
> PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
> printer, NC1_ID);
> IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
> spec.connect(conn, union, 0, printer, 0);
> spec.addRoot(printer);
> runTest(spec);
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)