This is an automated email from the ASF dual-hosted git repository. vitalii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new c58d8a080c DRILL-8248: Fix http_request for several rows (#2573) c58d8a080c is described below commit c58d8a080ccdf2b1361159de8c792906770be5f8 Author: Vitalii Diravka <vita...@apache.org> AuthorDate: Wed Jun 15 11:31:01 2022 +0300 DRILL-8248: Fix http_request for several rows (#2573) --- .../exec/store/http/udfs/HttpHelperFunctions.java | 82 ++++++++++++---------- .../drill/exec/store/http/udfs/HttpUdfUtils.java | 56 ++++++++------- .../exec/store/http/TestHttpUDFFunctions.java | 35 ++++++++- .../storage-http/src/test/resources/data/p4.json | 2 + .../store/kafka/decoders/JsonMessageReader.java | 22 +----- .../physical/impl/project/ProjectRecordBatch.java | 19 ++--- .../resultSet/impl/ResultSetLoaderImpl.java | 3 +- .../store/easy/json/loader/JsonLoaderImpl.java | 15 ++++ .../easy/json/loader/SingleElementIterator.java | 45 ++++++++++++ 9 files changed, 182 insertions(+), 97 deletions(-) diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java index 5e73b716ca..a95bc9ff60 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java @@ -52,55 +52,57 @@ public class HttpHelperFunctions { OptionManager options; @Inject - ResultSetLoader loader; + ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; + + @Workspace + org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream; @Override public void setup() { - jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() - .resultSetLoader(loader) - .standardOptions(options); + stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>(); + rsLoader.startBatch(); } @Override public void eval() { // Get the URL String url = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer); - // Process Positional Arguments java.util.List args = org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders); // If the arg list is null, indicating at least one null arg, return an empty map // as an approximation of null-if-null handling. if (args == null) { - // Return empty map return; } - String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args); - // Make the API call java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl); - // If the result string is null or empty, return an empty map if (results == null) { - // Return empty map return; } - try { - jsonLoaderBuilder.fromStream(results); - org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); - loader.startBatch(); - jsonLoader.readBatch(); + stream.setValue(results); + if (jsonLoader == null) { + jsonLoader = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.createJsonLoader(rsLoader, options, stream); + } + org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer(); + rowWriter.start(); + if (jsonLoader.parser().next()) { + rowWriter.save(); + } } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); + throw org.apache.drill.common.exceptions.UserException.dataReadError(e) + .message("Error while reading JSON. ") + .addContext(e.getMessage()) + .build(); } } } - @FunctionTemplate(names = {"http_request", "httpRequest"}, scope = FunctionTemplate.FunctionScope.SIMPLE, isVarArg = true) @@ -122,10 +124,10 @@ public class HttpHelperFunctions { DrillbitContext drillbitContext; @Inject - ResultSetLoader loader; + ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Workspace org.apache.drill.exec.store.http.HttpStoragePlugin plugin; @@ -133,6 +135,9 @@ public class HttpHelperFunctions { @Workspace org.apache.drill.exec.store.http.HttpApiConfig endpointConfig; + @Workspace + org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream; + @Override public void setup() { String schemaPath = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer); @@ -154,10 +159,9 @@ public class HttpHelperFunctions { endpointName, plugin.getConfig() ); - + stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>(); // Add JSON configuration from Storage plugin, if present. - jsonLoaderBuilder = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.setupJsonBuilder(endpointConfig, loader, options); - + rsLoader.startBatch(); } @Override @@ -167,30 +171,30 @@ public class HttpHelperFunctions { // If the arg list is null, indicating at least one null arg, return an empty map // as an approximation of null-if-null handling. if (args == null) { - // Return empty map return; } - - java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall( - plugin, - endpointConfig, - drillbitContext, - args - ).getInputStream(); - + java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args) + .getInputStream(); // If the result string is null or empty, return an empty map if (results == null) { - // Return empty map return; } - try { - jsonLoaderBuilder.fromStream(results); - org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); - loader.startBatch(); - jsonLoader.readBatch(); + stream.setValue(results); + if (jsonLoader == null) { + // Add JSON configuration from Storage plugin, if present. + jsonLoader = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.createJsonLoader(endpointConfig, rsLoader, options, stream); + } + org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer(); + rowWriter.start(); + if (jsonLoader.parser().next()) { + rowWriter.save(); + } } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); + throw org.apache.drill.common.exceptions.UserException.dataReadError(e) + .message("Error while reading JSON. ") + .addContext(e.getMessage()) + .build(); } } } diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java index 93f078f7e2..69dc859a35 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java @@ -21,42 +21,50 @@ package org.apache.drill.exec.store.http.udfs; import org.apache.commons.lang3.StringUtils; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; -import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; +import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator; import org.apache.drill.exec.store.http.HttpApiConfig; -import org.apache.drill.exec.store.http.HttpJsonOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.InputStream; + public class HttpUdfUtils { private static final Logger logger = LoggerFactory.getLogger(HttpUdfUtils.class); - public static JsonLoaderBuilder setupJsonBuilder(HttpApiConfig endpointConfig, ResultSetLoader loader, OptionManager options) { - loader.setTargetRowCount(1); + public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader, + OptionManager options, + SingleElementIterator<InputStream> stream) { + return createJsonLoader(null, rsLoader, options, stream); + } + public static JsonLoaderImpl createJsonLoader(HttpApiConfig endpointConfig, ResultSetLoader rsLoader, + OptionManager options, SingleElementIterator<InputStream> stream) { // Add JSON configuration from Storage plugin, if present. - HttpJsonOptions jsonOptions = endpointConfig.jsonOptions(); - JsonLoaderBuilder jsonLoaderBuilder = new JsonLoaderBuilder() - .resultSetLoader(loader) - .maxRows(1) - .standardOptions(options); - + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder = + new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() + .resultSetLoader(rsLoader) + .standardOptions(options) + .fromStream(() -> stream); // Add data path if present - if (StringUtils.isNotEmpty(endpointConfig.dataPath())) { - jsonLoaderBuilder.dataPath(endpointConfig.dataPath()); - } - - if (jsonOptions != null) { - // Add options from endpoint configuration to jsonLoader - JsonLoaderOptions jsonLoaderOptions = jsonOptions.getJsonOptions(options); - jsonLoaderBuilder.options(jsonLoaderOptions); + if (endpointConfig != null) { + if (StringUtils.isNotEmpty(endpointConfig.dataPath())) { + jsonLoaderBuilder.dataPath(endpointConfig.dataPath()); + } + // Add JSON configuration from Storage plugin, if present. + org.apache.drill.exec.store.http.HttpJsonOptions jsonOptions = endpointConfig.jsonOptions(); + if (jsonOptions != null) { + // Add options from endpoint configuration to jsonLoader + org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions jsonLoaderOptions = jsonOptions.getJsonOptions(options); + jsonLoaderBuilder.options(jsonLoaderOptions); - // Add provided schema if present - if (jsonOptions.schema() != null) { - logger.debug("Found schema: {}", jsonOptions.schema()); - jsonLoaderBuilder.providedSchema(jsonOptions.schema()); + // Add provided schema if present + if (jsonOptions.schema() != null) { + logger.debug("Found schema: {}", jsonOptions.schema()); + jsonLoaderBuilder.providedSchema(jsonOptions.schema()); + } } } - return jsonLoaderBuilder; + return (org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl) jsonLoaderBuilder.build(); } } diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java index 2cbe8d4b85..c5901b9087 100644 --- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java +++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java @@ -36,6 +36,7 @@ import org.apache.drill.exec.physical.rowSet.RowSetBuilder; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; +import org.apache.drill.exec.store.http.udfs.HttpUdfUtils; import org.apache.drill.exec.store.http.util.SimpleHttp; import org.apache.drill.exec.store.security.UsernamePasswordCredentials; import org.apache.drill.shaded.guava.com.google.common.base.Charsets; @@ -68,7 +69,7 @@ public class TestHttpUDFFunctions extends ClusterTest { private static String TEST_JSON_PAGE1; private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT; protected static LogFixture logFixture; - private final static Level CURRENT_LOG_LEVEL = Level.INFO; + private final static Level CURRENT_LOG_LEVEL = Level.DEBUG; @BeforeClass public static void setup() throws Exception { @@ -79,6 +80,7 @@ public class TestHttpUDFFunctions extends ClusterTest { .logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL) .logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL) .logger(ResultSetLoaderImpl.class, CURRENT_LOG_LEVEL) + .logger(HttpUdfUtils.class, CURRENT_LOG_LEVEL) .build(); startCluster(ClusterFixture.builder(dirTestWatcher)); TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read(); @@ -115,7 +117,7 @@ public class TestHttpUDFFunctions extends ClusterTest { configs.put("basicJson", basicJson); HttpStoragePluginConfig mockStorageConfigWithWorkspace = - new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "", + new HttpStoragePluginConfig(false, configs, 200, "globaluser", "globalpass", "", 80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of( UsernamePasswordCredentials.USERNAME, "globaluser", UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name()); @@ -130,6 +132,8 @@ public class TestHttpUDFFunctions extends ClusterTest { server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1)); RowSet results = client.queryBuilder().sql(sql).rowSet(); + assertEquals(1, results.rowCount()); + TupleMetadata expectedSchema = new SchemaBuilder() .addMap("data") .addNullable("col_1", MinorType.FLOAT8) @@ -147,6 +151,33 @@ public class TestHttpUDFFunctions extends ClusterTest { } } + @Test + public void testSeveralRowsAndRequests() throws Exception { + String sql = "SELECT http_request('local.basicJson', `col1`) as data FROM cp.`/data/p4.json`"; + try (MockWebServer server = startServer()) { + server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1)); + server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1)); + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + assertEquals(2, results.rowCount()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addMap("data") + .addNullable("col_1", MinorType.FLOAT8) + .addNullable("col_2", MinorType.FLOAT8) + .addNullable("col_3", MinorType.FLOAT8) + .resumeSchema() + .build(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(singleMap(mapValue(1.0, 2.0, 3.0))) + .addRow(singleMap(mapValue(4.0, 5.0, 6.0))) + .build(); + + RowSetUtilities.verify(expected, results); + } + } + @Test public void testHttpGetWithNoParams() throws Exception { try (MockWebServer server = startServer()) { diff --git a/contrib/storage-http/src/test/resources/data/p4.json b/contrib/storage-http/src/test/resources/data/p4.json new file mode 100644 index 0000000000..43a890b171 --- /dev/null +++ b/contrib/storage-http/src/test/resources/data/p4.json @@ -0,0 +1,2 @@ +{"col1": "apache"} +{"col1": "ddr"} diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java index a9aee5a990..ab135ade82 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java @@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.MetadataUtils; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions; +import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator; import org.apache.drill.exec.store.easy.json.parser.TokenIterator; import org.apache.drill.exec.store.kafka.KafkaStoragePlugin; import org.apache.drill.exec.store.kafka.MetaDataField; @@ -38,7 +39,6 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.InputStream; -import java.util.Iterator; import java.util.Properties; import java.util.StringJoiner; @@ -156,24 +156,4 @@ public class JsonMessageReader implements MessageReader { .add("resultSetLoader=" + resultSetLoader) .toString(); } - - public static class SingleElementIterator<T> implements Iterator<T> { - private T value; - - @Override - public boolean hasNext() { - return value != null; - } - - @Override - public T next() { - T value = this.value; - this.value = null; - return value; - } - - public void setValue(T value) { - this.value = value; - } - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 89e1beb6ea..a23227a275 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.physical.impl.project; +import org.apache.commons.collections4.CollectionUtils; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.types.TypeProtos; @@ -110,7 +111,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { memoryManager.update(); if (first && incomingRecordCount == 0) { - if (complexWriters != null || rsLoader != null ) { + if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null ) { IterOutcome next = null; while (incomingRecordCount == 0) { if (getLastKnownOutcome() == EMIT) { @@ -145,7 +146,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } } - if ((complexWriters != null || rsLoader != null) && getLastKnownOutcome() == EMIT) { + if ((!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) && getLastKnownOutcome() == EMIT) { throw UserException.unsupportedError() .message("Currently functions producing complex types as output are not " + "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " + @@ -177,7 +178,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } // In case of complex writer expression, vectors would be added to batch run-time. // We have to re-build the schema. - if (rsLoader != null && !rsLoader.isProjectionEmpty()) { + if (rsLoader != null) { MapVector map = container.addOrGet(container.getLast().getField().getName(), Types.required(TypeProtos.MinorType.MAP), MapVector.class); map.setMapValueCount(recordCount); for (VectorWrapper<?> vectorWrapper : rsLoader.harvest()) { @@ -185,7 +186,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { map.putChild(valueVector.getField().getName(), valueVector); } container.buildSchema(SelectionVectorMode.NONE); - } else if (complexWriters != null) { + } else if (!CollectionUtils.isEmpty(complexWriters)) { container.buildSchema(SelectionVectorMode.NONE); } @@ -224,7 +225,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } // In case of complex writer expression, vectors would be added to batch run-time. // We have to re-build the schema. - if (complexWriters != null || rsLoader != null) { + if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) { container.buildSchema(SelectionVectorMode.NONE); } @@ -358,7 +359,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) { // In a case of complex writers vectors are added at runtime, so the schema // may change (e.g. when a batch contains new column(s) not present in previous batches) - if (complexWriters != null || rsLoader != null) { + if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) { return IterOutcome.OK_NEW_SCHEMA; } return super.getFinalOutcome(hasMoreRecordInBoundary); @@ -374,11 +375,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } allocationVectors = new ArrayList<>(); - if (complexWriters != null) { - container.clear(); - } else if (rsLoader != null) { + if (rsLoader != null) { container.clear(); rsLoader.close(); + } else if (!CollectionUtils.isEmpty(complexWriters)) { + container.clear(); } else { // Release the underlying DrillBufs and reset the ValueVectors to empty // Not clearing the container here is fine since Project output schema is diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java index 10cca0b648..dd5973c482 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java @@ -434,8 +434,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals { throw new IllegalStateException("Unexpected state: " + state); } - // Update the visible schema with any pending overflow batch - // updates + // Update the visible schema with any pending overflow batch updates harvestSchemaVersion = activeSchemaVersion; pendingRowCount = 0; batchSizeLimit = (int) Math.min(targetRowCount, options.scanLimit - totalRowCount()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java index e5755cb07b..7e9d9d541e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java @@ -20,10 +20,14 @@ package org.apache.drill.exec.store.easy.json.loader; import java.io.IOException; import java.io.InputStream; import java.io.Reader; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import io.netty.buffer.DrillBuf; +import org.apache.commons.io.IOUtils; import org.apache.drill.common.exceptions.CustomErrorContext; import org.apache.drill.common.exceptions.EmptyErrorContext; import org.apache.drill.common.exceptions.UserException; @@ -43,6 +47,7 @@ import org.apache.drill.exec.store.easy.json.parser.TokenIterator.RecoverableJso import org.apache.drill.exec.store.easy.json.parser.ValueDef; import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType; import org.apache.drill.exec.vector.accessor.UnsupportedConversionError; +import org.apache.drill.exec.vector.complex.fn.DrillBufInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -189,6 +194,16 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory { return this; } + public JsonLoaderBuilder fromStream(int start, int end, DrillBuf buf) { + this.streams = Collections.singletonList(DrillBufInputStream.getStream(start, end, buf)); + return this; + } + + public JsonLoaderBuilder fromString(String jsonString) { + this.streams = Collections.singletonList(IOUtils.toInputStream(jsonString, Charset.defaultCharset())); + return this; + } + public JsonLoaderBuilder fromReader(Reader reader) { this.reader = reader; return this; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java new file mode 100644 index 0000000000..6073c0e06e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.json.loader; + +import java.util.Iterator; + +/** + * It allows setting the current value in the iterator and can be used once after {@link #next} call + * + * @param <T> type of the value + */ +public class SingleElementIterator<T> implements Iterator<T> { + private T value; + + @Override + public boolean hasNext() { + return value != null; + } + + @Override + public T next() { + T value = this.value; + this.value = null; + return value; + } + + public void setValue(T value) { + this.value = value; + } + }