[GitHub] [arrow] fsaintjacques commented on a change in pull request #7030: ARROW-7808: [Java][Dataset] Implement Datasets Java API by JNI to C++

2020-06-12 Thread GitBox


fsaintjacques commented on a change in pull request #7030:
URL: https://github.com/apache/arrow/pull/7030#discussion_r439413765



##
File path: cpp/src/arrow/dataset/discovery.h
##
@@ -216,6 +216,16 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public 
DatasetFactory {
   std::shared_ptr filesystem, fs::FileSelector selector,
   std::shared_ptr format, FileSystemFactoryOptions options);
 
+  /// \brief Build a FileSystemDatasetFactory from an uri including filesystem
+  /// information.
+  ///
+  /// \param[in] uri passed to FileSystemDataset
+  /// \param[in] format passed to FileSystemDataset
+  /// \param[in] options see FileSystemFactoryOptions for more information.
+  static Result> Make(std::string uri,

Review comment:
   :+1: 

##
File path: 
java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.BufferLedger;
+import org.apache.arrow.memory.NativeUnderlingMemory;
+import org.apache.arrow.memory.Ownerships;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.SchemaUtility;
+
+/**
+ * Native implementation of {@link Scanner}. Note that it currently emits only 
a single scan task of type
+ * {@link NativeScanTask}, which is internally a combination of all scan task 
instances returned by the
+ * native scanner.
+ */
+public class NativeScanner implements Scanner {
+
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+  private final AtomicBoolean executed = new AtomicBoolean(false);
+  private final NativeContext context;
+  private final long scannerId;
+
+  public NativeScanner(NativeContext context, long scannerId) {
+this.context = context;
+this.scannerId = scannerId;
+  }
+
+  ScanTask.BatchIterator execute() {
+if (closed.get()) {
+  throw new NativeInstanceClosedException();
+}
+if (!executed.compareAndSet(false, true)) {
+  throw new UnsupportedOperationException("NativeScanner cannot be 
executed more than once. Consider creating " +
+  "new scanner instead");
+}
+return new ScanTask.BatchIterator() {
+  private ArrowRecordBatch peek = null;
+
+  @Override
+  public void close() {
+NativeScanner.this.close();
+  }
+
+  @Override
+  public boolean hasNext() {
+if (closed.get()) {

Review comment:
   What's the point of checking close before the method if any concurrent 
thread can come and close said object while this method is running? In other 
words, this doesn't protect against "use-after-close". Since `close()` will 
delete the `shared_ptr` underneath, you're very open to user-after-free in C++.
   
   I think you need to protect with close state with a read-write-mutex, or 
document that the Scanner is single threaded only, which it is in C++. The 
important part is that ScanTasks can be dispatched concurrently.

##
File path: 
java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or 

[GitHub] [arrow] fsaintjacques commented on a change in pull request #7030: ARROW-7808: [Java][Dataset] Implement Datasets Java API by JNI to C++

2020-05-26 Thread GitBox


fsaintjacques commented on a change in pull request #7030:
URL: https://github.com/apache/arrow/pull/7030#discussion_r430588931



##
File path: cpp/.gitignore
##
@@ -24,6 +24,8 @@ cmake_install.cmake
 build/
 *-build/
 Testing/
+cmake-build-debug/

Review comment:
   I didn't know this was the default behavior of CLion, in such case, your 
change is a sane default for developers. Do the following:
   
   ```
   # build directories created by clion
   cmake-build-*
   ```

##
File path: cpp/src/jni/dataset/jni_wrapper.cpp
##
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "org_apache_arrow_dataset_file_JniWrapper.h"
+#include "org_apache_arrow_dataset_jni_JniWrapper.h"
+
+static jclass illegal_access_exception_class;
+static jclass illegal_argument_exception_class;
+static jclass runtime_exception_class;
+
+static jclass record_batch_handle_class;
+static jclass record_batch_handle_field_class;
+static jclass record_batch_handle_buffer_class;
+
+static jmethodID record_batch_handle_constructor;
+static jmethodID record_batch_handle_field_constructor;
+static jmethodID record_batch_handle_buffer_constructor;
+
+static jint JNI_VERSION = JNI_VERSION_1_6;
+
+class JniPendingException : public std::runtime_error {
+ public:
+  explicit JniPendingException(const std::string& arg) : runtime_error(arg) {}
+};
+
+void ThrowPendingException(const std::string& message) {
+  throw JniPendingException(message);
+}
+
+template 
+T JniGetOrThrow(arrow::Result result) {
+  if (!result.status().ok()) {
+ThrowPendingException(result.status().message());
+  }
+  return std::move(result).ValueOrDie();
+}
+
+void JniAssertOkOrThrow(arrow::Status status) {
+  if (!status.ok()) {
+ThrowPendingException(status.message());
+  }
+}
+
+void JniThrow(std::string message) { ThrowPendingException(message); }
+
+#define JNI_METHOD_START try {
+// macro ended
+
+#define JNI_METHOD_END(fallback_expr) \
+  }   \
+  catch (JniPendingException & e) {   \
+env->ThrowNew(runtime_exception_class, e.what()); \
+return fallback_expr; \
+  }
+// macro ended
+
+jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) {
+  jclass local_class = env->FindClass(class_name);
+  jclass global_class = (jclass)env->NewGlobalRef(local_class);
+  env->DeleteLocalRef(local_class);
+  return global_class;
+}
+
+arrow::Result GetMethodID(JNIEnv* env, jclass this_class, const 
char* name,
+ const char* sig) {
+  jmethodID ret = env->GetMethodID(this_class, name, sig);
+  if (ret == nullptr) {
+std::string error_message = "Unable to find method " + std::string(name) +
+" within signature" + std::string(sig);
+return arrow::Status::Invalid(error_message);
+  }
+  return ret;
+}
+
+jint JNI_OnLoad(JavaVM* vm, void* reserved) {
+  JNIEnv* env;
+  if (vm->GetEnv(reinterpret_cast(), JNI_VERSION) != JNI_OK) {
+return JNI_ERR;
+  }
+  JNI_METHOD_START
+  illegal_access_exception_class =
+  CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;");
+  illegal_argument_exception_class =
+  CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;");
+  runtime_exception_class =
+  CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;");
+
+  record_batch_handle_class =
+  CreateGlobalClassReference(env,
+ "Lorg/apache/arrow/"
+ "dataset/jni/NativeRecordBatchHandle;");
+  record_batch_handle_field_class =
+  CreateGlobalClassReference(env,
+ "Lorg/apache/arrow/"
+ "dataset/jni/NativeRecordBatchHandle$Field;");
+  record_batch_handle_buffer_class =
+  CreateGlobalClassReference(env,
+ "Lorg/apache/arrow/"
+ 
"dataset/jni/NativeRecordBatchHandle$Buffer;");
+
+  record_batch_handle_constructor =
+  

[GitHub] [arrow] fsaintjacques commented on a change in pull request #7030: ARROW-7808: [Java][Dataset] Implement Datasets Java API by JNI to C++

2020-05-22 Thread GitBox


fsaintjacques commented on a change in pull request #7030:
URL: https://github.com/apache/arrow/pull/7030#discussion_r426869642



##
File path: 
java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The JniLoader for Datasets API's native implementation.
+ */
+public final class JniLoader {
+  private static final JniLoader INSTANCE = new JniLoader();
+  private static final List LIBRARY_NAMES = 
Collections.singletonList("arrow_dataset_jni");
+
+  private AtomicBoolean loaded = new AtomicBoolean(false);
+
+  public static JniLoader get() {
+return INSTANCE;
+  }
+
+  private JniLoader() {
+  }
+
+  /**
+   * If required JNI libraries are not loaded, then load them.
+   */
+  public void ensureLoaded() {
+if (loaded.compareAndSet(false, true)) {

Review comment:
   It seems the logic would be to set `loaded` _after_ iterating and 
loading said libraries. I don't know the concurrency requirements of 
`System.load` so I can't comment. You could fix this by adding a single method:
   
   ```java
   public void ensureLoaded() {
 if (!loaded) {
   loadAll();
 }
   }
   
   public void synchronized loadAll() {
 // The method is protected by a mutex via synchronized, if more than 
 // one thread race to call loadAll, only one will do the actual loading 
and the
 // others will be a noop once they acquire the mutex.
 if (loaded) return;
 LIBRARY_NAMES.forEach(this::load);
 loaded.set(true);
   }
   ...

##
File path: cpp/src/jni/dataset/jni_wrapper.cpp
##
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "org_apache_arrow_dataset_file_JniWrapper.h"
+#include "org_apache_arrow_dataset_jni_JniWrapper.h"
+
+static jclass illegal_access_exception_class;
+static jclass illegal_argument_exception_class;
+static jclass runtime_exception_class;
+
+static jclass record_batch_handle_class;
+static jclass record_batch_handle_field_class;
+static jclass record_batch_handle_buffer_class;
+
+static jmethodID record_batch_handle_constructor;
+static jmethodID record_batch_handle_field_constructor;
+static jmethodID record_batch_handle_buffer_constructor;
+
+static jint JNI_VERSION = JNI_VERSION_1_6;
+
+class JniPendingException : public std::runtime_error {
+ public:
+  explicit JniPendingException(const std::string& arg) : runtime_error(arg) {}
+};
+
+void ThrowPendingException(const std::string& message) {
+  throw JniPendingException(message);
+}
+
+template 
+T JniGetOrThrow(arrow::Result result) {
+  if (!result.status().ok()) {
+ThrowPendingException(result.status().message());
+  }
+  return std::move(result).ValueOrDie();
+}
+
+void JniAssertOkOrThrow(arrow::Status status) {
+  if (!status.ok()) {
+ThrowPendingException(status.message());
+  }
+}
+
+void JniThrow(std::string message) { ThrowPendingException(message); }
+
+#define JNI_METHOD_START try {
+// macro ended
+
+#define