[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-07 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22952
  
BTW: [HADOOP-15748](https://issues.apache.org/jira/browse/HADOOP-15748), 
*S3 listing inconsistency can raise NPE in globber*

Could be backported to 2.8+; low risk


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-07 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22952
  
 > Looks like we can leverage GlobPattern but it is marked as @Private.

I will happily switch this from private to public/evolving if you submit a 
patch against hadoop-trunk; backport it. Most recent changes to that class were 
2015 (!) HADOOP-12436 and jan 2017, HADOOP-13976, newline handling. Nobody is 
going to modify that class out of fear of breaking things.

Much easier for me to review and commit than to write a patch myself and 
try to get it reviewed...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-04 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22952
  
bq. GlobExpander is private

that's correctable. 

1. Make sure there are standalone tests (if none around)
1. Make sure that off filesystem.md there's something declaring normatively 
WTF it expands
1. Provide a patch moving from @Private  to @Public/Evolving

We can apply those changes to branch-2 and trunk, and, because the class is 
already public you can use it knowing that in the development line of hadoop 
we've committed to not moving, deleting or breaking it.

As usual: file a HADOOP-* jira with the patch, link me to it, I'll do my 
best to review


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-03 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22952
  
> HDFS does not support it yet, though on the way, see 
https://issues.apache.org/jira/browse/HADOOP-10019

That's an old patch; I don't know of any active dev there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-12-03 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22952
  
Hadoop FS glob filtering is pathologically bad  on object stores. 
I have tried in the past to do an ~O(1) impl for S3 
[HADOOP-13371](https://issues.apache.org/jira/browse/HADOOP-13371). While I 
could produce one which was efficient for test cases, it would suffer in the 
use case "selective pattern match at the top of a very wide tree", where you 
really do want to filter down aggressively for the topmost 
directory/directories.

I think there you'd want to have a threshold as to how many path elements 
up you'd switch from ls dir + match into the full deep listfiles(recursive) 
scan 

Not looked at it for ages. If someone does want to play there, welcome to 
take it up


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-28 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237206264
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

+ non-normative list of [what I think are the relevant JVM 
Settings](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/KDiag.java#L88)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-28 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237203744
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

"java.security.auth.login.config"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol binding to...

2018-11-22 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21066
  
+1

one thing to consider here is to be ruthless about when there are things in 
bits of the HDFS APIs/libraries which don't suit, and rather than think "how do 
we work around this", think "what do we need to do to get this fixed". 

This includes (base on the HBase & Hive experiences)
* what's marked stable
* serialization of classes
* pulling up of operations from HDFS to the public FileSystem API (source 
of some contention there between myself and the hdfs team as to what 
constitutes acceptable specification and tests)
* thread safety (HBase & encrypted IO)
* various constants in HDFS interfaces tagged as private.
etc.

BTW, I'm thinking of retiring the MRv1 commit APIs: initially marking as 
deprecated. I'd match that with something to pre-emptively move spark onto the 
V2 one. After all, it's all bridged internally.







---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol binding to...

2018-11-20 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21066
  
The main barrier to this is the what-do-we-do-about-hive problem, as 
without it ASF Spark doesn't run against Hadoop 3.x

It looks like "support Hive 2" is the plan there, *which is the right thing 
to do long term*

short term, well, we're actually shipping this and the patched hive 1.2.x 
artifacts in HDP-3.0; qualifying through our own tests, etc. I'm happy with it.

It's also worth noting that there's work ongoing in Hadoop 3.2-3.3 to add 
multipart upload as an explicit API across filesystems, so you'll be able to 
write committers which can use multipart upload & commit across stores. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233871529
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

you are right: the user shouldn't need to be logging in again, as normal 
end-user use is to be kinited in, rather than giving a keytab


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233870838
  
--- Diff: core/pom.xml ---
@@ -408,6 +408,19 @@
   provided
 
 
+

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233870430
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
--- End diff --

Although in theory we could fix up MR, distp, spark etc to say "always ask 
for DTs", it may just encourage people to run with Kerberos off, which is never 
something they should be doing. I don't want to do that & am not actively 
playing with this approach. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

2018-11-12 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r232825455
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+
+renameCompletedFiles: whether to rename completed 
files in previous batch (default: false). If the option is enabled, input file 
will be renamed with additional postfix "_COMPLETED_". This is useful to clean 
up old input files to save space in storage.
--- End diff --

S3 rename is O(data), whereas for real filesystems it is O(1). Azure is 
usually  O(1) unless some cross-shard move takes place, then it drops to 
O(data)...much rarer though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

2018-11-12 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r232824087
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
--- End diff --

I'd drop s3n  & s3 refs as they have gone from deprecated to deceased


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22752: [SPARK-24787][CORE] Revert hsync in EventLoggingL...

2018-10-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22752#discussion_r226243409
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -449,7 +450,7 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   listing.write(info.copy(lastProcessed = newLastScanTime, 
fileSize = entry.getLen()))
 }
 
-if (info.fileSize < entry.getLen()) {
+if (info.fileSize < entry.getLen() || 
checkAbsoluteLength(info, entry)) {
--- End diff --

...there's no timetable for that getLength thing, but if HDFS already 
supports the API, I'm more motivated to implement it. It has benefits in cloud 
stores in general
1. saves apps going an up front HEAD/getFileStatus() to know how long their 
data is; the GET should return it.
2. for S3 Select, you get back the filtered data so don't know how much you 
will see until the GET is issued


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r226235800
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME, 
KEYTAB, PRINCIPAL}
+import 
org.apache.spark.sql.kafka010.TokenUtil.KafkaDelegationTokenIdentifier
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
--- End diff --

If you are playing with UGI in tests, its usually safest to 
call`UserGroupInformation.reset()` during setup and teardown; this empties out 
all existing creds and avoids problems with >1 test in the same JVM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r226235157
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
--- End diff --

are these always going to be valid? I.e. > 0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r226234330
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
--- End diff --

OK: so this asks for DTs even if UGI says the cluster is insecure? 

Nothing wrong with that...I've been wondering what would happen if 
`HadoopFSDelegationTokenProvider` did the same thing: asked filesystems for 
their tokens even if in an insecure cluster, as it would let DT support in 
object stores (HADOOP-14556...) work without kerberos.

I'd test to make sure that everything gets through OK. AFAIK YARN is happy 
to pass round credentials in an insecure cluster (it get the AM/RM token to the 
AM this way); its more a matter of making sure the launcher chain is all ready 
fo it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22752: [SPARK-24787][CORE] Revert hsync in EventLoggingL...

2018-10-17 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22752#discussion_r225908701
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -449,7 +450,7 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   listing.write(info.copy(lastProcessed = newLastScanTime, 
fileSize = entry.getLen()))
 }
 
-if (info.fileSize < entry.getLen()) {
+if (info.fileSize < entry.getLen() || 
checkAbsoluteLength(info, entry)) {
--- End diff --

Have you looked @ this getFileLength() call to see how well it updates?
 
FwIW [HADOOP-15606](https://issues.apache.org/jira/browse/HADOOP-15606) 
proposes adding a method like this for all streams, though that proposal 
includes the need for specification and tests. Generally the HDFS team are a 
bit lax about that spec -> test workflow, which doesn't help downstream code or 
other implementations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-09 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223625059
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/TokenUtilSuite.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class TokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  TokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
TokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  .equals(bootStrapServers))
--- End diff --

why use of .equals() over scalatest's `===` operator? That one includes the 
values in the assertion raised


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-09 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223623927
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.kafka.common.security.scram.ScramLoginModule
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object KafkaSecurityHelper extends Logging {
+  def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = {
+val keytab = sparkConf.get(KEYTAB)
+if (keytab.isDefined) {
+  val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
+  require(serviceName.nonEmpty, "Kerberos service name must be 
defined")
+  val principal = sparkConf.get(PRINCIPAL)
+  require(principal.nonEmpty, "Principal must be defined")
+
+  val params =
+s"""
+|${getKrb5LoginModuleName} required
+| useKeyTab=true
+| serviceName="${serviceName.get}"
+| keyTab="${keytab.get}"
+| principal="${principal.get}";
+""".stripMargin.replace("\n", "")
+  logDebug(s"Krb JAAS params: $params")
+  Some(params)
+} else {
+  None
+}
+  }
+
+  private def getKrb5LoginModuleName(): String = {
--- End diff --

+ add a comment pointing at hadoop UserGroupInformation so that if someone 
ever needs to maintain it, they'll know where to look


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-02 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r222061609
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object KafkaSecurityHelper extends Logging {
+  def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = {
+if (sparkConf.get(KEYTAB).nonEmpty) {
+  Some(getKrbJaasParams(sparkConf))
+} else {
+  None
+}
+  }
+
+  def getKrbJaasParams(sparkConf: SparkConf): String = {
+val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
+require(serviceName.nonEmpty, "Kerberos service name must be defined")
+val keytab = sparkConf.get(KEYTAB)
+require(keytab.nonEmpty, "Keytab must be defined")
+val principal = sparkConf.get(PRINCIPAL)
+require(principal.nonEmpty, "Principal must be defined")
+
+val params =
+  s"""
+  |com.sun.security.auth.module.Krb5LoginModule required
--- End diff --

There is a whole section in UGI code related to IBM JVMs changing the 
classnames of kerberos stuff. Just assume that any class with sun or oracle in 
the name will be different there and you won't be disappointed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-01 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r221586243
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object KafkaSecurityHelper extends Logging {
+  def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = {
+if (sparkConf.get(KEYTAB).nonEmpty) {
+  Some(getKrbJaasParams(sparkConf))
+} else {
+  None
+}
+  }
+
+  def getKrbJaasParams(sparkConf: SparkConf): String = {
+val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
+require(serviceName.nonEmpty, "Kerberos service name must be defined")
+val keytab = sparkConf.get(KEYTAB)
+require(keytab.nonEmpty, "Keytab must be defined")
+val principal = sparkConf.get(PRINCIPAL)
+require(principal.nonEmpty, "Principal must be defined")
+
+val params =
+  s"""
+  |com.sun.security.auth.module.Krb5LoginModule required
--- End diff --

IBM JAAS is different here.see [hadoop and kerberps 
-JAAS](https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/sections/jaas.html)

In 
[ZKDelegationTokenSecretManager](https://github.com/apache/hadoop/blob/a55d6bba71c81c1c4e9d8cd11f55c78f10a548b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java#L241)
 this is done based off the JVM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22339: [SPARK-17159][STREAM] Significant speed up for running s...

2018-09-28 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22339
  
no, no cost penalties. Slightly lower namenode load too. If you had many, 
many spark streaming clients scanning directories, HDFS ops teams would 
eventually get upset. This will postpone the day


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22339: [SPARK-17159][STREAM] Significant speed up for ru...

2018-09-28 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22339#discussion_r221326673
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
+  .filter(_.isDirectory)
+  .map(_.getPath)
   val newFiles = directories.flatMap(dir =>
-fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
+fs.listStatus(dir)
+.filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
+.map(_.getPath.toString))
   val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
-  logInfo("Finding new files took " + timeTaken + " ms")
-  logDebug("# cached file times = " + fileToModTime.size)
+  logInfo(s"Finding new files took $timeTaken ms")
--- End diff --

It was originally @ info, so if it it filled up logs *too much* there'd be 
complaints. What's important is that the time to scan is printed, either @ info 
or debug, so someone can see what's happening. Probably what does need logging 
@ warn is when the time to scan is greater than the window, or just getting 
close to it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22339: [SPARK-17159][STREAM] Significant speed up for running s...

2018-09-28 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22339
  
Why the speedups? Comes from that glob filter calling getFileStatus() on 
every entry, which is is 1-3 HTTP requests and a few hundred millis per call, 
when instead that can be handled later. As a result, the more files you have in 
a path, the more time the scan takes, until eventually the scan time > window 
interval at which point your code is dead.

The other stuff is simply associated optimisations.

Now, I'm obviously happy with this, especially as I seem I getting credit. 
And it will help speedup working with any store. But I need to warn people: it 
is not sufficient

The key problem here is: files uploaded by S3 multipart upload get a 
timestamp on when the upload began, not finished —yet only become visible at 
the end of the upload. If a caller starts up an upload in window t, and doesn't 
complete it until window t+1, then it may get missed.

There's not much which can be done here, except in documenting the risk.

What is a good solution? It'd be to use the cloud-infra-providers own event 
notification mechanism and subscribe to changes in a store. AWS, Azure and GCS 
all offer something like this. 

There's a home for the S3 one of those in aws-kinesis, perhaps


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22339: [SPARK-17159][STREAM] Significant speed up for ru...

2018-09-28 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22339#discussion_r22134
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
+  .filter(_.isDirectory)
+  .map(_.getPath)
   val newFiles = directories.flatMap(dir =>
-fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
+fs.listStatus(dir)
+.filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
+.map(_.getPath.toString))
   val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
-  logInfo("Finding new files took " + timeTaken + " ms")
-  logDebug("# cached file times = " + fileToModTime.size)
+  logInfo(s"Finding new files took $timeTaken ms")
--- End diff --

depends on how big it grows over time


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...

2018-09-18 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21048
  
if this is being looked at again, it'd be nice to have a reference back end 
which just did the write straight to the destination: this is exactly what all 
the public cloud stores (s3, azure-*, gcs) offer as their atomic write. There'd 
be no need to make things clever and automatically switch; just make it 
something people can ask for.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22444: [SPARK-25409][Core]Speed up Spark History loading via in...

2018-09-18 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22444
  
I see the reasoning here

* @jianjianjiao has a very large cluster with many thousands of history 
files of past (successful) jobs.
* history server startup needs to go through all these logs before being 
usable, so any server restart results in hours of downtime, just from scanning.
* this patch breaks things up to be incremental.

I don't have any opinions on the patch itself; I've not looked at that code 
for so long my reviews are probably dangerous.

Two thought: 

1. would it make sense for the initial scans to go for the most recent logs 
first, because that 2.5 hour time to scan all files is still there. 
1. would you want the UI and rest api to indicate that the scan was still 
in progress, and not to worry if the listing was incomplete?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22327: [SPARK-25330][BUILD] Revert Hadoop 2.7 to 2.7.3

2018-09-05 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22327
  
The goal for the 2.7.x line should be "nothing breaks", which is precisely 
why it's only getting critical patches. Reverting might make the problem go 
away, but you can assume that everyone running HDFS clusters with something 
based on the 2.7.x line will get this patch in before long: it's best to 
identify what's up and address it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22327: [SPARK-25330][BUILD] Revert Hadoop 2.7 to 2.7.3

2018-09-05 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22327
  
The 2.7.x branches updates are generally all security plus some fixes for 
JVM/JDK regressions.

without looking at the details, you can assume that the regression will be 
related to one of these and for that reason, wouldn't recommend rolling back. 
Better to find the problem and  come up with a fix or workaround.

Created [HADOOP-15722](https://issues.apache.org/jira/browse/HADOOP-15722) 
to cover this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17745: [SPARK-17159][Streaming] optimise check for new files in...

2018-09-03 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17745
  
Patch is in the spark cloud integration module, you can take it and try to 
get into ASF spark provided you also add some credit to me in the patch.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22186: [SPARK-25183][SQL] Spark HiveServer2 to use Spark Shutdo...

2018-08-31 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22186
  
thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22213: [SPARK-25221][DEPLOY] Consistent trailing whitespace tre...

2018-08-31 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22213
  
code LGTM. Clearly its a tangible problem, especially for some one-char 
option like "myapp.line.separator"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22213: [SPARK-25221][DEPLOY] Consistent trailing whitespace tre...

2018-08-30 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22213
  
This actually makes sense. We always forget this, but java properties file 
format is [more complex than any of us 
remember](https://docs.oracle.com/javase/10/docs/api/java/util/Properties.html#load(java.io.Reader))

At the time of this trim taking place, all CR/LF chars in the source file 
will have been stripped through one of
* being the end of an entry: property contains all chars up to that line 
break (or line skipped if empty/comment)
* being proceeded by a backslash, in which case the following line will 
have its initial whitespace stripped then joined to the subsequent line.

Whoever did the wikipedia article [did some good 
examples](https://en.wikipedia.org/wiki/.properties)

What this means is: by the time the spark trim() code is reached, the only 
CR and LF entries in a property are those from expanding \r and \n character 
pairs in the actual property itself. All of these within a property, e.g 
`p1=a\nb` already get through, this extends it to propertlies like `p2=\r`. 

* should be able to easy to write some tests for `trimExceptCRLF()` 
directly, e.g. how it handles odd strings (one char, value == 0), empty string, 
...
* There's an XML format for properties too, which should also be tested to 
see WTF goes on there. 

PS, looking up for the properties spec highlights that Java 9 [uses UTF-8 
for the properties 
encoding](https://docs.oracle.com/javase/9/intl/internationalization-enhancements-jdk-9.htm#JSINT-GUID-974CF488-23E8-4963-A322-82006A7A14C7).
 Don't know of any implications here. 






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22186: [SPARK-25183][SQL][WIP] Spark HiveServer2 to use Spark S...

2018-08-29 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22186
  
This will eliminate a race condition between FS shutdown (in the hadoop 
shutdown manager) and the hive callback. Theres a risk today that the 
filesystems will be closed before that event log close()/rename() is called, so 
things don't get saved —and this can happen with any FS.

registering the shutdown hook via the spark APIs, with a priority > than 
the FS shutdown, guarantees that it will be called before the FS shutdown. But 
it doesn't guarantee that the operation will complete within the 10s time limit 
hard coded into Hadoop 2.8.x+ for any single shutdown hook to complete. It is 
going to work in HDFS except in the special case of HDFS NN lock or GC pause.

The Hadoop configurable delay of 
[HADOOP-15679](https://issues.apache.org/jira/browse/HADOOP-15679) needs to go 
in. I've increased the default timeout to 30s there for more forgiveness with 
HDFS, and for object stores with O(data) renames people should configure it 
with a timeout of minutes, or, if they want to turn it off altogether, hours. 

I'm backporting HADOOP-15679 to all branches 2.8.x+, so all hadoop versions 
with that timeout will have the timeout configurable & the default time 
extended.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22186: [SPARK-25183][SQL][WIP] Spark HiveServer2 to use Spark S...

2018-08-28 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22186
  
The latest patch builds locally

Maven test outcome
* lots of json missing method errors, clearly jackson version problems of 
some kind
* I don't see log messages of hive shutdown appearing in the output, though 
after all the tests finish I do get a log showing the FS cleanup is going on

```
18/08/28 22:09:58 INFO ShutdownHookManager: Shutdown hook called
18/08/28 22:09:58 INFO ShutdownHookManager: Deleting directory 
...spark/sql/hive-thriftserver/target/tmp/
```

I think it might be possible to actually test whether the shutdown hook was 
added by calling remove(hook) in a test and verifying that the hook was found, 
that is : it was registered. Some caching of the hook and a package-level 
removeHook method in the HiveServer, though wiring it all the way up to a test 
case would be tricky...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22186: [SPARK-25183][SQL][WIP] Spark HiveServer2 to use Spark S...

2018-08-28 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22186
  
My local build wasn't including that module; it now does and the link works 
with a subclass of `AbstractFunction0`.

The local tests are failing under maven with hive/jackson mismatch though. 
I'm going to consider that a separate issue.

```
#c=cvalue;d=dvalue
- SPARK-16563 ThriftCLIService FetchResults repeat fetching result *** 
FAILED ***
  java.sql.SQLException: java.lang.NoSuchMethodError: 
org.json4s.jackson.JsonMethods$.parse$default$3()Z
  at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:296)
  at 
org.apache.spark.sql.hive.thriftserver.HiveThriftJdbcTest$$anonfun$withMultipleConnectionJdbcStatement$2.apply(HiveThriftServer2Suites.scala:814)
  at 
org.apache.spark.sql.hive.thriftserver.HiveThriftJdbcTest$$anonfun$withMultipleConnectionJdbcStatement$2.apply(HiveThriftServer2Suites.scala:813)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
  at 
org.apache.spark.sql.hive.thriftserver.HiveThriftJdbcTest.withMultipleConnectionJdbcStatement(HiveThriftServer2Suites.scala:813)
  at 
org.apache.spark.sql.hive.thriftserver.HiveThriftJdbcTest.withJdbcStatement(HiveThriftServer2Suites.scala:822)
  at 
org.apache.spark.sql.hive.thriftserver.HiveThriftBinaryServerSuite$$anonfun$2$$anonfun$apply$mcV$sp$2.apply(HiveThriftServer2Suites.scala:100)
  at 
org.apache.spark.sql.hive.thriftserver.HiveThriftBinaryServerSuite$$anonfun$2$$anonfun$apply$mcV$sp$2.apply(HiveThriftServer2Suites.scala:96)
  at 
org.apache.spark.sql.hive.thriftserver.HiveThriftBinaryServerSuite.org$apache$spark$sql$hive$thriftserver$HiveThriftBinaryServerSuite$$withCLIServiceClient(HiveThriftServer2Suites.scala:71)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22186: [SPARK-25183][SQL][WIP] Spark HiveServer2 to use Spark S...

2018-08-26 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22186
  
my local maven build *did* work, so maybe its a javac/JVM version thing. 
Will move back to a java class callback.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...

2018-08-26 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/17745#discussion_r212822877
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
--- End diff --

Still a lot; I think we can do a new one.

Latest version of this code is 
[here](https://github.com/hortonworks-spark/cloud-integration/tree/master/spark-cloud-integration);
 I think its time to set up a module in bahir for this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...

2018-08-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/17745#discussion_r212391371
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
--- End diff --

globStatus is flawed; key limit is that it does a tree walk. It needs to be 
replaced with an object-store-list specific one. See 
[HADOOP-13371](https://issues.apache.org/jira/browse/HADOOP-13371).

The issue with implementing an s3a flat-list and filter is that if the 
wildcard is a few entries up from the child path and there are lots of 
children, e..g

```
s3a://bucket/data/year=201?/month=*/day=*/
```

then if there are many files under year/month/day entries, all get listed 
during the filter. 

What I think would need to be done is to be able to config the FS to limit 
the depth of where it switches to bulk listing; so here I could say "depth=2", 
and so the year=? would be done via globbing, but the month= and day= would be 
better.

Or maybe just start with making the whole thing optional, and let the 
caller deal with it.

Anyway, options here

* fix the Hadoop side call. Nice and broadly useful
* see if spark can be moved off the globStatus call. Will change matching.  
But if you provide a new "cloudstore" connector, that could be done, couldn't 
it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22186: [SPARK-25183][SQL][WIP] Spark HiveServer2 to use Spark S...

2018-08-22 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22186
  
Not sure what is up with the build here; worked with mvn locally. Possibly 
my use of a java 8 lamda-expression as the hook?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22186: [SPARK-25183][SQL] Spark HiveServer2 to use Spark...

2018-08-22 Thread steveloughran
GitHub user steveloughran opened a pull request:

https://github.com/apache/spark/pull/22186

[SPARK-25183][SQL] Spark HiveServer2 to use Spark ShutdownHookManager

## What changes were proposed in this pull request?

Switch `org.apache.hive.service.server.HiveServer2` to register its 
shutdown callback with Spark's `ShutdownHookManager`, rather than direct with 
the Java Runtime callback.

This avoids race conditions in shutdown where the filesystem is shutdown 
before the flush/write/rename of the event log is completed, particularly on 
object stores where the write and rename can be slow.

## How was this patch tested?

There's no explicit unit for test this, which is consistent with every 
other shutdown hook in the codebase.

* There's an implicit test when the scalatest process is halted.
* More manual/integration testing is needed.

HADOOP-15679 has added the ability to explicitly execute the hadoop 
shutdown hook sequence which spark uses; that could be stabilized for testing 
if desired, after which all the spark hooks could be tested. Until then: 
external system tests only.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steveloughran/spark BUG/SPARK-25183-shutdown

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22186


commit 8fbeb59c8a96dd7f7ed6982009bac59ab3fa87ce
Author: Steve Loughran 
Date:   2018-08-22T17:13:19Z

SPARK-25183 Spark HiveServer2 to use Spark ShutdownHookManager for shutdown 
hook

Change-Id: I9a0885660efda4ec6277e0237ca7eada0b43533f




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17342: [SPARK-12868][SQL] Allow adding jars from hdfs

2018-08-21 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17342
  
Well, no obvious answer there I'm afraid, except "don't put HDFS JARs on 
the classpath"; if you serve them up via HTTP all should work


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17342: [SPARK-12868][SQL] Allow adding jars from hdfs

2018-08-21 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17342
  
Hmmm.[SPARK-21697](https://issues.apache.org/jira/browse/SPARK-21697) has a 
lot of the CP, but the problem in that one is some recursive loading of 
artifacts off HDFS, the can for commons-logging.properties being the 
troublespot. 

@rajeshcode , what you have seems more that a classic "class not found" 
problem, where one class is loading, but a dependency isn't being found. And as 
HDFS has moved its stuff around in a split from one  hadoop-hdfs JAR into split 
client and server, that may be the cause.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17342: [SPARK-12868][SQL] Allow adding jars from hdfs

2018-08-19 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17342
  
At a guess, there's possibly a mix here between hadoop hdfs JARs on your 
classpath. You sure everything on the classpath is in sync? What JARs with 
hadoop-hdfs are there?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22117: [SPARK-23654][BUILD] remove jets3t as a dependenc...

2018-08-16 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/22117


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22081: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-16 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22081
  
Thanks. Two less JARs on the CP to keep up to date —what more can anyone 
want?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22117: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-16 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22117
  
Test failure in `
org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a 
sbt.testing.SuiteSelector)`: 

```
Caused by: sbt.ForkMain$ForkError: java.lang.NoClassDefFoundError: 
javax/jdo/JDOException
at 
org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5501)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:184)
at 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:73)
... 41 more
Caused by: sbt.ForkMain$ForkError: java.lang.ClassNotFoundException: 
javax.jdo.JDOException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.doLoadClass(IsolatedClientLoader.scala:227)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.loadClass(IsolatedClientLoader.scala:216)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 44 more
```

somehow datanucleus JARs aren't on the CP for the hive test. I can't see 
how this patch is causing this —can anyone else? But if not: why is it 
surfacing here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22117: [SPARK-23654][BUILD] remove jets3t as a dependenc...

2018-08-15 Thread steveloughran
GitHub user steveloughran opened a pull request:

https://github.com/apache/spark/pull/22117

[SPARK-23654][BUILD] remove jets3t as a dependency of spark

# What changes were proposed in this pull request?

Remove jets3t dependency, and bouncy castle which it brings in; update 
licenses and deps

Note this is just #22081 with merge conflict resolved; submitting to see 
what jenkins says.

# How was this patch tested?

Existing tests on a JVM with unlimited Java Crypto Extensions

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steveloughran/spark incoming/PR-22081-jets3t

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22117.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22117


commit 3cad78f8bb9bc0dc841cd0c31e0b0d52f8e7c764
Author: Sean Owen 
Date:   2018-08-11T21:41:38Z

Remove jets3t dependency, and bouncy castle which it brings in; update 
licenses and deps




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



reviews@spark.apache.org

2018-08-15 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22099
  
thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21146: [SPARK-23654][BUILD] remove jets3t as a dependenc...

2018-08-15 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/21146


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21146: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-15 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21146
  
Closing now we have a test run with the combination of: No jets3t, no 
bouncy castle, upgraded kinesis. *all the kinesis tests now run*


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



reviews@spark.apache.org

2018-08-15 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22099
  
> To be clear you think this passed because it still uses jets3t and that 
still brings in BC? 
correct

> Then we can maybe merge this and rebase the other change to find out. 
correct

> This update won't have changed that situation with strong crypto being 
required right?

don't know. What it did do was stop my local test runs without bouncy 
castle failing with errors about certificate validation. 

This patch is a good thing to do anyway, because it's good to stay somewhat 
current with the AWS releases (more chance of issues being addressed, reduced 
cost of future migrations). So it can be merged in and then the problem of 
getting #22081's test run to work addressed after. 


I reopened #21146 & applied this patched to it, to see what Jenkins did 
there.  The overall test runs come out as failing -hard to point to any related 
cause, but the Kinesis ones do all pass: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94769/testReport/org.apache.spark.streaming.kinesis/

I'm going to close that one again to avoid confusion about which of the 
"remove jets3t" patches people should be looking at; once the kinesis update is 
merged in you'll need to retest your #22081 PR and let's see what Jenkins says 
there


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22081: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-15 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22081
  
making a test-time option is a reasonable idea -getting the unlimited JCE 
on the test machines (they don't right now) would remove the need for this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



reviews@spark.apache.org

2018-08-14 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22099
  
Local kinesis tests with both -Phadoop-3.1, -Phadoop-2.7 & `Phadoop-3.1 
-Dhadoop.version=3.1.1` are all working here (with bouncycastle, unlimited JCE 
in JVM).

I'm updating the #21146 PR with this patch to see what happens with the 
combination in Jenkins of no bouncycastle, updated Kinesis. 

Test run failure here was 
`org.apache.spark.streaming.kafka010.DirectKafkaStreamSuite.offset recovery 
from kafka`; hard to see how it relates


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21146: [SPARK-23654][BUILD] remove jets3t as a dependenc...

2018-08-14 Thread steveloughran
GitHub user steveloughran reopened a pull request:

https://github.com/apache/spark/pull/21146

[SPARK-23654][BUILD] remove jets3t as a dependency of spark

## What changes were proposed in this pull request?

With the update of bouncy-castle JAR in Spark 2.3; jets3t doesn't work any 
more; hence the s3n 
connector to S3 is dead. Only one person has noticed so far. The hadoop s3n 
connector is never going to be updated to work with a later version of jets3t, 
instead the code has just been [cut from hadoop 3.x 
entirely.](http://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/s3n.html).

This patch remove the declarations of jets3t from the POMs which include it 
(root and spark-core), so it is not being packaged up.

Of the transitive dependencies, one is pinned at the same version, the 
others removed.

* javax.activation is kept at version 1.1.1. by an explicit import; without 
this it would be downgraded to version 1.0.
* bcprov-jdk15on-1.58.jar, base64-2.3.8.jar, java-xmlbuilder-1.1.jar are 
all removed. They are not directly used in Spark.

## How was this patch tested?

Seeing what jenkins has to say about the missing/changed dependencies. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steveloughran/spark cloud/SPARK-23654-jets3t

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21146.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21146


commit f4b00c2d464f6c29f6ed97974ad256b5cb8b3fcc
Author: Steve Loughran 
Date:   2018-03-28T09:34:11Z

SPARK-23654 remove jets3t as a dependency of spark
This also updates the dev/deps dependency files so it is not expected

Change-Id: I15798710925d378de97523f7f89dbe5bd1cc8582

commit db77f2812dca195f9e940625421d5625e8497b93
Author: Steve Loughran 
Date:   2018-07-04T15:26:11Z

[SPARK-23654]. Explicitly add javax.activation-1.1.1 to the spark-core 
dependencies so the version it transitively exports is the same as the one it 
used to when jets3t was on the import graph. Remove the other JARs from the 
deps list.

Change-Id: If22bb1b26381e2e8a3df050a535d644f45d306fe

commit eba7c820bb0fd14a382afbd76911a16fd8ab0d68
Author: Steve Loughran 
Date:   2018-07-14T15:18:58Z

[SPARK-23654] inline javax.activation version

Change-Id: I1298d6c3063ece8bc86575d781a85ba5309cda39

commit 9adc0ec943a0bc867e2017b0ecc1f028da4ea2f6
Author: Steve Loughran 
Date:   2018-08-13T18:10:57Z

[SPARK-23654] purge all of bouncy-castle from the POM and from all license 
files

Change-Id: Iaf2e87ca57d46592551cfdbe6d5f8b419a92bcae

commit 853ee1cf1143073acec9f3d0cdade7d445ecc4c7
Author: Steve Loughran 
Date:   2018-08-14T05:38:02Z

[SPARK-25111] increment kinesis client/producer lib versions & aws-sdk to 
match

Change-Id: Ibf4162fca33189086bef234b6752f403a06aa7b0




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22081: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-14 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22081
  
No, the SDKs dont pull in bouncy-castle. Checked via mvnrepo

* [core 
sdk](http://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-core/1.11.271)
 pulls in jackson & httpclient; historically v. fussy about httpclient versions
* [kinesis 
SDK](http://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-client/1.8.10):
 adds protobuf-2.6, guava 18.0, others.

I've checked with Shane: the jenkins systems do not have the unlimited 
javax crypto in, so suspect that bouncy-castle is just needed for testing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



reviews@spark.apache.org

2018-08-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22099
  
@srowen @budde @ajfabbri


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



reviews@spark.apache.org

2018-08-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22099
  
As noted in #22146; stripping off bouncy castle and upgrading the SDK 
worked. But a local test run of just this patch brought up the same error seen 
in #22081 

```
WithoutAggregationKinesisStreamSuite:
- KinesisUtils API
- RDD generation
- basic operation
- custom message handling *** FAILED ***
  The code passed to eventually never returned normally. Attempted 20 times 
over 2.092846262916667 minutes. Last failure message: 
collected.synchronized[Boolean](KinesisStreamTests.this.convertToEqualizer[scala.collection.mutable.HashSet[Int]](collected).===(modData.toSet[Int])(scalactic.this.Equality.default[scala.collection.mutable.HashSet[Int]]))
 was false
  Data received does not match data sent. (KinesisStreamSuite.scala:230)
- Kinesis read with custom configurations
- split and merge shards in a stream
- failure recovery *** FAILED ***
  The code passed to eventually never returned normally. Attempted 105 
times over 2.0055098129 minutes. Last failure message: isCheckpointPresent was 
true, but 0 was not greater than 10. (KinesisStreamSuite.scala:398)
```
That wasn't a full clean build, so let's see what Jenkins says and some 
more test runs tomorrow. It could just be this is all showing up some flakiness 
in the test case. At the very least, some more details on the failure might be 
good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22099: [SPARK-25111][BUILD] increment kinesis client/pro...

2018-08-13 Thread steveloughran
GitHub user steveloughran opened a pull request:

https://github.com/apache/spark/pull/22099

[SPARK-25111][BUILD] increment kinesis client/producer & aws-sdk versions

## What changes were proposed in this pull request?

Increment the kinesis client, producer and transient AWS SDK versions to a 
more recent release.

This is to help with the move off bouncy castle of #21146 and #22081; the 
goal is that moving up to the new SDK will allow a JVM with unlimited JCE but 
without bouncy castle to work with Kinesis endpoints. 

Why this specific set of artifacts? it syncs up with the 1.11.271 AWS SDK 
used by hadoop 3.0.3, hadoop-3.1. and hadoop 3.1.1; that's been stable for the 
uses there (s3, STS, dynamo). 

## How was this patch tested?

Running all the external/kinesis-asl tests via maven with java 8.121 & 
unlimited JCE, without bouncy castle (#21146); default endpoint of us-west.2. 
Without this SDK update I was getting http cert validation errors, with it they 
went away.

# This PR is not ready without 

* Jenkins test runs to see what it is happy with
* more testing: repeated runs, another endpoint
* looking at the new deprecation warnings and selectively addressing them 
(the AWS SDKs are pretty aggressive about deprecation, but sometimes they 
increase the complexity of the client code or block some codepaths off 
completely)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steveloughran/spark cloud/SPARK-25111-kinesis

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22099.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22099


commit e79e5b9c0bbdf24dcc3cda30dc2c1a70d12b02aa
Author: Steve Loughran 
Date:   2018-08-14T05:38:02Z

[SPARK-25111] increment kinesis client/producer lib versions & aws-sdk to 
match.

Change-Id: Ic2d12a07d273bd1b6fc4c681075070f22ed1e44c




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21146: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21146
  
And if you bump up the kinesis client and AWS SDK version to 1.11.271, 
those failures go away. 

```
Run completed in 15 minutes, 28 seconds.
Total number of tests run: 59
Suites: completed 9, aborted 0
Tests: succeeded 59, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
[INFO] 

[INFO] BUILD SUCCESS
[INFO] 

[INFO] Total time: 16:09 min
[INFO] Finished at: 2018-08-13T22:35:17-07:00
[INFO] 

```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21146: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21146
  
FYI, I just did a kinesis test run with this PR on a JVM with the unlimited 
JCE installed (explicitly verified by shasum of the relevant JARs); failure 
with cert errors.

```
success on second attempt after a Kinesis throttling exception
- retry success on second attempt after a Kinesis dependency exception
- retry failed after a shutdown exception
- retry failed after an invalid state exception
- retry failed after unexpected exception
- retry failed after exhausting all retries
WithAggregationKinesisBackedBlockRDDSuite:
[2018-08-13 22:01:12.872175] [0x7b6f9000] [info] 
[kinesis_producer.cc:79] Created pipeline for stream 
"KinesisTestUtils-3116048591482212471"
[2018-08-13 22:01:12.872918] [0x7b6f9000] [info] [shard_map.cc:83] 
Updating shard map for stream "KinesisTestUtils-3116048591482212471"
[2018-08-13 22:01:13.040026] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
monitoring.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:13.054034] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:15.943229] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:18.924473] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:21.919673] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:25.121685] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:27.925785] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:30.917030] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:33.960962] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:36.926987] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:39.912528] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed
[2018-08-13 22:01:42.911743] [0x7b905000] [error] 
[http_client.cc:148] Failed to open connection to 
kinesis.us-west-2.amazonaws.com:443 : certificate verify failed

org.apache.spark.streaming.kinesis.WithAggregationKinesisBackedBlockRDDSuite 
*** ABORTED ***
  java.lang.IllegalArgumentException: requirement failed: Need data to be 
sent to multiple shards
  at scala.Predef$.require(Predef.scala:224)
  at 
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests$$anonfun$beforeAll$1.apply$mcV$sp(KinesisBackedBlockRDDSuite.scala:47)
  at 
org.apache.spark.streaming.kinesis.KinesisFunSuite$class.runIfTestsEnabled(KinesisFunSuite.scala:41)
  at 
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.runIfTestsEnabled(KinesisBackedBlockRDDSuite.scala:25)
  at 
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.beforeAll(KinesisBackedBlockRDDSuite.scala:42)
  at 
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:212)
  at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
  at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
  at org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1210)
  at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1257)
  ...

the assert failure is possibly a followon from the previous problem


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22081: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/22081
  
I've just pushed up my PR which is ~ in sync with this one; I'll close that 
one now and this can be the one to use. 

Assume: kinesis uses bouncy castle somewhere.  There's some hints in the 
AWS docs

[Encrypt and Decrypt Amazon Kinesis Records Using AWS 
KMS](https://aws.amazon.com/blogs/big-data/encrypt-and-decrypt-amazon-kinesis-records-using-aws-kms/)
 covers end-to-end encryption of Kinesis records. For this you need the AWS 
encryption SDK, whose docs [say you need bouncy 
castle](https://docs.aws.amazon.com/encryption-sdk/latest/developer-guide/java.html).

And it looks like the AWS encryption SDK does explicitly [depend on bouncy 
castle](http://mvnrepository.com/artifact/com.amazonaws/aws-encryption-sdk-java/1.3.5).

Imagine if *somehow* the removal of bouncy castle as a java crypto provider 
was stopping that round trip working with some of the encrypt/decrypt not 
happening. In which case adding bouncy castle should fix things. It worked 
before because jets3t in spark-core added bouncy castle, and the last 
bouncy-castle version update made it in sync with kinesis (and broke jets3t, 
but nobody has noticed...)

But
* There's no refs to javax.crypto, the aws crypto libs or calls to the 
class `KinesisEncryptionUtils`referenced in the blog post in the spark kinesis 
module (it's not in the latest SDKs either(
* There's no build-time dependency on the aws-sdk encryption, which would 
transitively pull in the bouncy castle stuff.
* Looking through the aws-sdk-bundle: no refs to javax.crypto in the 
kinesis code; encryption refs limited to the PUT request where you can request 
server-side encryption with a given KMS key. 
* Nor is there any `com.aws.encryptionsdk` in that bundle, or shaded bouncy 
castle (which is good, as otherwise I'd have to deal with the fact that some 
ASF projects were shipping a shaded version of it unknowingly)

It could just be a strong java crypto provided is needed, and in the 
absence of the unlimited java crypto JAR in the JDK lib dir (where it's needed 
for kerberos to work), bouncy-castle needs to be on the CP.

What to do?

1. you can remove jets3t independent of the bouncy castle changes, because 
Kinesis isn't going to be using jets3t. The aws-s3 module significantly 
supercedes the jets3t client's functionality, and is the only one you'd expect 
the other parts of the AWS SDK to pick up. 
1. the bouncy-castle dependency could be upgraded to a later version in the 
kinesis module(s) alone, and explicitly added to kinesis-asl.
1. Someone needs to do some experiments with what happens to the test suite 
with/without the full JCE and bouncy castle, maybe including more details on 
whats not matching up in the round trip tests
1 Maybe including some new test which somehow explores what encryption 
algorithms/keys you get with/without the BC  and JCE-unlimited JARs










---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21146: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21146
  
closing as #22081 supercedes it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21146: [SPARK-23654][BUILD] remove jets3t as a dependenc...

2018-08-13 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/21146


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22081: [SPARK-23654][BUILD] remove jets3t as a dependenc...

2018-08-13 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22081#discussion_r209707448
  
--- Diff: pom.xml ---
@@ -984,24 +987,15 @@
   
 
   
-  
+  
   
-net.java.dev.jets3t
-jets3t
-${jets3t.version}
+javax.activation
+activation
+1.1.1
--- End diff --

the reason for the activation is that it still comes in from somewhere and 
the version pulled in drops from 1.1.1 to 1.1; meaning it'd be an accidental 
downgrade of a JAR. I don't know exactly what uses javax.activation: it is one 
of those historical artifacts whose main role, mapping mime types, is 
potentially used somewhere important. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208317741
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -270,11 +269,11 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
 }
   }
 
-  override def createStreamWriter(
+  override def createStreamingWritSupport(
--- End diff --

typo?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21146: [SPARK-23654][BUILD] remove jets3t as a dependenc...

2018-07-06 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21146#discussion_r200596975
  
--- Diff: dev/deps/spark-deps-hadoop-2.6 ---
@@ -21,8 +21,6 @@ automaton-1.11-8.jar
 avro-1.7.7.jar
 avro-ipc-1.7.7.jar
 avro-mapred-1.7.7-hadoop2.jar
-base64-2.3.8.jar
-bcprov-jdk15on-1.58.jar
--- End diff --

if there's nothing else for bouncy castle then it's cleaner for classpaths 
if it goes, as for imports.

looking at Hadoop, it it pulls in bcprov-jdk16, but only for testing. and 
with out any explicit calls on bouncy castle APIs that I can see. Presumably 
its registered as a crypto

```xml
 
   org.bouncycastle
   bcprov-jdk16
   1.46
   test
 
```

If it is going, I should update this PR & JIRA name to make clear it is gone


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21146: [SPARK-23654][BUILD] remove jets3t as a dependenc...

2018-07-04 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21146#discussion_r200162894
  
--- Diff: pom.xml ---
@@ -141,7 +141,7 @@
 3.1.5
 1.7.7
 hadoop2
-0.9.4
+1.1.1
--- End diff --

I'll inline. It's not like its one of those dependencies that anyone doing 
their own build will ever want to change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21146: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-07-04 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21146
  
OK, I've reinstated javax.activation 1.1.1 as an export from spark core 
(over v 1.1), point to this JIRA in the comments, and updated the -deps lists 
to remove the others.

Removing jets3t is the right thing to do. It's never going to work again & 
while the AWS SDK is equally troublesome to keep up to date, the hadoop-aws's 
move to a shaded JAR removes transitive dependency conflict as a source of 
friction and pain.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21146: [SPARK-23654][BUILD][WiP] remove jets3t as a dependency ...

2018-07-04 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21146
  
BTW, the activation framework (primariy used for some mime type stuff) is 
still being developed, now on github  at @javaee 
[https://github.com/javaee/activation](https://github.com/javaee/activation). 
At least it is being maintained


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21146: [SPARK-23654][BUILD][WiP] remove jets3t as a dependency ...

2018-07-03 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21146
  
There's usually good reasons for upgrading crypto stuff like bouncy castle; 
nothing to feel bad about.

How about I take this patch & add the explicit activation 1.1.1 ref to 
reinstate it, leave the rest out?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21146: [SPARK-23654][BUILD][WiP] remove jets3t as a dependency ...

2018-07-02 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21146
  
The transitive dependencies are a separate issue. Jets3t pulls in 3 JARs 
which nothing else seems to need, but which transitively go onto the spark CP

downgraded
* javax.activation : could/should be bumped up to 1.1.1 again;

missing
* bcprov-jdk15on-1.58.jar ? really part of bouncy castle
* base64-2.3.8.jar,   There are so many base-64 encoders on the average 
classpath nobody will be short of that one
* 
[java-xmlbuilder-1.1.jar](https://mvnrepository.com/artifact/com.jamesmurty.utils/java-xmlbuilder)
 ? there are newer versions; removing it and letting recipients choose their 
own would be wiser




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21146: [SPARK-23654][BUILD][WiP] remove jets3t as a dependency ...

2018-07-02 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21146
  
It's not going to stop user code from working as the bouncy castle version 
on the classpath means that Jets3t doesn't actually work. 

The fact that nobody has complained about this must count as a metric of 
how many people use jets3t :)

More of an issue is the fact that the httpclient in 2.3+ isn't compatible 
with the AWS SDK in Hadoop 2.8.x. ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21588: [SPARK-24590][BUILD] Make Jenkins tests passed with hado...

2018-06-25 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21588
  
There's a technical issue: trivial change to the case statement

and a ASF process one: the only ASF project which can release hive 
artifacts is the hive team; it's that way due to ASF release indemnity stuff 
(only ASF official releases come with that, and only a project may release its 
own artifacts).

There's not AFAIK any reason the Hive team can't/won't do this, its just a 
matter of pushing them hard enough to get signoff.  @jerryshao has the bigger 
patch though it's not been getting the attention it deserves. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a p...

2018-06-21 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21257
  
some overall thought

* I think this is only happening on a successful job commit, not abort. 
This is the desired action?
* if something goes wrong here, is failing the entire job the correct 
action? If the deletes were happening earlier, then yes, the job would 
obviously fail. But now the core work has taken place, it's just cleanup 
failing. Which could be: permissions, transient network, etc. 

I'll have to look a bit closer at what happens in committer cleanups right 
now, though as they are focused on rm -f $dest/__temporary/$jobAttempt, they 
are less worried about failures here as it shoudn't be changing any public 
datasets


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-06-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r197177156
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -235,4 +244,41 @@ class HadoopMapReduceCommitProtocol(
   tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
 }
   }
+
+  /**
+   * now just record the file to be delete
+   */
+  override def deleteWithJob(fs: FileSystem, path: Path,
--- End diff --

No need to worry about concurrent access here, correct? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-06-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r197176461
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -207,9 +210,23 @@ case class InsertIntoHadoopFsRelationCommand(
 }
 // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
 val staticPrefixPath = 
qualifiedOutputPath.suffix(staticPartitionPrefix)
-if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-  throw new IOException(s"Unable to clear output " +
-s"directory $staticPrefixPath prior to writing to it")
+if (fs.exists(staticPrefixPath)) {
+  if (staticPartitionPrefix.isEmpty && outputCheck) {
+// input contain output, only delete output sub files when job 
commit
+  val files = fs.listFiles(staticPrefixPath, false)
+  while (files.hasNext) {
+val file = files.next()
+if (!committer.deleteWithJob(fs, file.getPath, false)) {
+  throw new IOException(s"Unable to clear output " +
+s"directory ${file.getPath} prior to writing to it")
+}
+  }
+  } else {
+if (!committer.deleteWithJob(fs, staticPrefixPath, true)) {
+  throw new IOException(s"Unable to clear output " +
--- End diff --

again, hard to see how this exception path would be reached.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-06-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r197176292
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -207,9 +210,23 @@ case class InsertIntoHadoopFsRelationCommand(
 }
 // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
 val staticPrefixPath = 
qualifiedOutputPath.suffix(staticPartitionPrefix)
-if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-  throw new IOException(s"Unable to clear output " +
-s"directory $staticPrefixPath prior to writing to it")
+if (fs.exists(staticPrefixPath)) {
+  if (staticPartitionPrefix.isEmpty && outputCheck) {
+// input contain output, only delete output sub files when job 
commit
+  val files = fs.listFiles(staticPrefixPath, false)
--- End diff --

if there are a lot of files here, you've gone from a dir delete which was 
O(1) on a fileystem, probably O(descendant) on an object store to at 
O(children) on an FS, O(children * descendants (chlld)) op here.  Not 
significant for a small number of files, but could potentially be expensive. 
Why do the iteration at all?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-06-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r197174835
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -207,9 +210,23 @@ case class InsertIntoHadoopFsRelationCommand(
 }
 // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
 val staticPrefixPath = 
qualifiedOutputPath.suffix(staticPartitionPrefix)
-if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-  throw new IOException(s"Unable to clear output " +
-s"directory $staticPrefixPath prior to writing to it")
+if (fs.exists(staticPrefixPath)) {
+  if (staticPartitionPrefix.isEmpty && outputCheck) {
+// input contain output, only delete output sub files when job 
commit
+  val files = fs.listFiles(staticPrefixPath, false)
+  while (files.hasNext) {
+val file = files.next()
+if (!committer.deleteWithJob(fs, file.getPath, false)) {
+  throw new IOException(s"Unable to clear output " +
--- End diff --

as `committer.deleteWithJob()` returns true in base class, that check won't 
do much, at least not with the default impl. Probably better just to have 
`deleteWithJob()` return Unit, require callers to raise an exception on a 
delete failure. Given that delete() is required to say "dest doesn't exist if 
you return", I don't think they need to do any checks at all


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-06-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r197173180
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -235,4 +244,41 @@ class HadoopMapReduceCommitProtocol(
   tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
 }
   }
+
+  /**
+   * now just record the file to be delete
+   */
+  override def deleteWithJob(fs: FileSystem, path: Path,
+  canDeleteNow: Boolean = true): Boolean = {
+if (canDeleteNow) {
+  super.deleteWithJob(fs, path)
+} else {
+  val set = if (pathsToDelete.contains(fs)) {
+pathsToDelete(fs)
+  } else {
+new mutable.HashSet[Path]()
+  }
+
+  set.add(path)
+  pathsToDelete.put(fs, set)
+  true
+}
+  }
+
+  private def cleanPathToDelete(): Unit = {
+// first delete the should delete special file
+for (fs <- pathsToDelete.keys) {
+  for (path <- pathsToDelete(fs)) {
+try {
+  if (!fs.delete(path, true)) {
+logWarning(s"Delete path ${path} fail at job commit time")
+  }
+} catch {
+  case ex: IOException =>
+throw new IOException(s"Unable to clear output " +
+s"file ${path} at job commit time", ex)
--- End diff --

recommend including ex.toString() in the new exception raised, as child 
exception text can often get lost


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-06-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r197172859
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -235,4 +244,41 @@ class HadoopMapReduceCommitProtocol(
   tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
 }
   }
+
+  /**
+   * now just record the file to be delete
+   */
+  override def deleteWithJob(fs: FileSystem, path: Path,
+  canDeleteNow: Boolean = true): Boolean = {
+if (canDeleteNow) {
+  super.deleteWithJob(fs, path)
+} else {
+  val set = if (pathsToDelete.contains(fs)) {
+pathsToDelete(fs)
+  } else {
+new mutable.HashSet[Path]()
+  }
+
+  set.add(path)
+  pathsToDelete.put(fs, set)
+  true
+}
+  }
+
+  private def cleanPathToDelete(): Unit = {
+// first delete the should delete special file
+for (fs <- pathsToDelete.keys) {
+  for (path <- pathsToDelete(fs)) {
+try {
+  if (!fs.delete(path, true)) {
+logWarning(s"Delete path ${path} fail at job commit time")
--- End diff --

delete -> false just means there was nothing there, I wouldn't warn at that 
point. Unless `delete()` throws an exception you assume that when the call 
returns, `fs.exists(path)` does not hold -regardless of the return value. 
(Special exception, the dest is "/")


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21588: [SPARK-24590][BUILD] Make Jenkins tests passed with hado...

2018-06-20 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21588
  
If jenkins is happy, this is good. 
* Be interesting to see what happens in a build with the 
hadoop-cloud-storage module, if it adds new dependencies
* regarding commons-config, know that todd lipcon has just filed 
[HADOOP-15549](https://issues.apache.org/jira/browse/HADOOP-15549) over a perf 
regression. Not sure what will happen there: rollback vs try to upgrade


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-16 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21286
  
@jinxing64 yes, with the detail that the way some bits of hadoop parse a 
jobattempt, they like it to be an integer. Some random number used as the upper 
digits of counter could work; it'd still give meaningful job IDs like 
"45630001" for the first, "45630002", for the process which  came up with 
"4563" as its prefix. Yes, eventually it'll wrap, but that's integers for you.

BTW, the `newFileAbsPath` code creates the staging dir ".spark-staging-" + 
jobId. Again, a jobID unique across all processes is enough


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-16 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21286
  
@jinxing64 from my reading of the code, the original patch proposed 
creating a temp dir for every query, which could then do its own work & cleanup 
in parallel, with a new meta-commit on each job commit, moving stuff from this 
per-job temp dir into the final dest. 

This is to address
* conflict of work in the `_temporary/0` path
* rm of `_temporary` in job abort, post-commit cleanup

And the reason for that '0' is that spark's job id is just a counter of 
queries done from app start, whereas on hadoop MR it's unique for across a live 
YARN cluster. Spark deploys in different ways, and can't rely on that value.

The job id discussion proposes generating unique job IDs for every spark 
app, so allowing `_temporary/$jobID1` to work alongside ``_temporary/$jobID2`. 
With that *and disabling cleanup in the FileOutputCommitter 
(`mapreduce.fileoutputcommitter.cleanup.skipped`), @zheh12 should get what they 
need: parallel queries to same dest using FileOutputCommitter without conflict 
of temp data

> Thus the change outside committer and doesn't break commiterr's logic. 
Did I understand correctly ?

Exactly. It also makes it a simpler change, which is good as the commit 
algorithms are pretty complex and its hard to test all the failure modes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-15 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21286
  
> After the job is committed, `skip_dir/tab1/_temporary` will be deleted. 
Then when other jobs attempt to commit, an error will be reported.

I see. Yes, that's 
`org.apache.hadoop.mapreduce.OutputCommitter.cleanupJob()` doing the work. It 
does this as it wants to cleanup all attempts, including predecessors which 
have failed, and expects only one job to be writing at a time.

Like I said, this proposed patch breaks all the blobstore-specific 
committer work, causes problems at scale with HDFS alone, and adds a new 
problem: how do you clean up from failed jobs writing to the same destination? 

It's causing these problems because it's using another layer of temp dir 
and then the rename.

Assuming you only want to work with 
`org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter` and subclasses 
thereof (like the Parquet one), why not

1. Pick up my SPARK-23977 patch and Hadoop 3.1. There are some problems 
with hive versioning there, but that is a WiP of mine.
1. make your own subclass of `FileOutputCommitter` whose `cleanupJob()` 
method doesn't do that full `$dest/_temporary` dir cleanup, just deletes the 
current job ID's subdir
1. Configure the jobs (new) committer factory underneath the 
FileOutputFormat to return your committer; do the same for parquet via the 
`BindingParquetOutputCommitter`. 

That way, you get to choose cleanup policy, don't create conflict, don't 
need to rename things. 

There's also the option of providing a MAPREDUCE- patch to add a switch to 
change cleanup to only purge that job's data...you'd need to make sure all 
attempts of that job get cleaned up, as MR can make multiple attempts. There's 
a general fear of going near that class as its such a critical piece of code, 
but cleanup is not the bit everyone is scared of. Get a change in there and all 
the file output committer subclasses get it. That'd be for Hadoop 3.2 & 2.10; 
no need to change anything in spark other than the job ID problem.


> Meanwhile, due to all applications share the same app appempt id, they 
write temporary data to the same temporary dir `skip_dir/tab1/_temporary/0`. 
Data committed by the successful application is also corrupted.

that's the issue we've been discussing related to job IDs. If each spark 
driver comes up with a unique job ID, that conflict will go away.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r188262174
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -163,6 +170,15 @@ class HadoopMapReduceCommitProtocol(
   }
 
   override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
+// first delete the should delete special file
+for (fs <- pathsToDelete.keys) {
+  for (path <- pathsToDelete(fs)) {
+if (fs.exists(path)) {
+  fs.delete(path, true)
--- End diff --

1. you don't need to do the exists check, it's just overhead. delete() will 
return false if there was nothing to delete.
2. But...what if that delete throws an exception? Should the commit fail 
(as it does now?), or downgraded. As an example, the hadoop 
`FileOutputCommtter` uses the option 
`"mapreduce.fileoutputcommitter.cleanup-failures.ignored` to choose what to do 
there
3. ...and: what about cleanup in an abort job?

I think you'd be best off isolating this cleanup into its own method and 
call from both job commit & job abort, in job commit discuss with others what 
to do, and in job abort just log & continue


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-14 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21286
  
...this makes me think that the FileOutputCommitter actually has an 
assumption that nobody has called out before, specifically "only one 
application will be writing data to the target FS with the same job id". It's 
probably been implicit in MR with a local HDFS for a long time, first on the 
assumption of all jobs getting unique job Ids from the same central source 
*and* nothing outside the cluster writing to the same destinations. With cloud 
stores, that doesn't hold; it's conceivable that >1  YARN cluster could start 
jobs with the same dest. As the timestamp of YARN launch is used as the initial 
part of the identifier, if >1 cluster was launched in the same minute, things 
are lined up to collide. Oops.

FWIW, the parsing code I mentioned is 
{{org.apache.hadoop.mapreduce.JobID.forName()}}: any numbering scheme spark 
uses should be able to map from a string to a job ID through that & back again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-14 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21286
  
that would work. Like you say, no need to worry about job attempt IDs, just 
uniqueness. If you put the timestamp first, you could still sort the listing by 
time, which might be good for diagnostics.

Some org.apache.hadoop code snippets do attempt to parse the yarn app 
attempt strings into numeric job & task IDs in exactly the way they shouldn't. 
It should already have surfaced if it was a problem in the committer codepaths, 
but it's worth remembering & maybe replicate in the new IDs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-14 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r187954805
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -235,4 +247,23 @@ class HadoopMapReduceCommitProtocol(
   tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
 }
   }
+
+  /**
+   * now just record the file to be delete
+   */
+  override def deleteWithJob(fs: FileSystem, path: Path, recursive: 
Boolean,
+canDeleteNow: Boolean = true): Boolean = {
+if (canDeleteNow) {
+  super.deleteWithJob(fs, path, recursive)
+} else {
+  pathsToDelete.add(path -> recursive)
+}
+  }
+
+  private def deletePath(fs: FileSystem, path: Path, recursive: Boolean): 
Unit = {
+if (fs.exists(path) && !fs.delete(path, recursive)) {
+  throw new IOException(s"Unable to clear output " +
+s"directory $path")
+}
--- End diff --

I'd personally ignore a failure on delete(), as the conditions for the API 
call are "if this doesn't raise an exception then the dest is gone". You can 
skip the exists check as it will be superfluous


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-14 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r187953870
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -163,6 +169,12 @@ class HadoopMapReduceCommitProtocol(
   }
 
   override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
+// first delete the should delete special file
+val committerFs = 
jobContext.getWorkingDirectory.getFileSystem(jobContext.getConfiguration)
--- End diff --

I'm not sure you can guarantee that the working dir is always the dest FS. 
At least with @rdblue's committers, task attempts work dirs are in file:// & 
task commit (somehow) gets them to the destFS in a form where job commit will 
make them visible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-14 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/21286
  
> cc @steveloughran who I believe is the expert in this area.

I suppose "Stepped through the FileOutputCommit operations with a debugger 
and a pen and paper" counts, given the complexity there. There's still a lot of 
corner cases which I'm not 100% sure on (or confident that the expectations of 
the job coordinators are met). Otherwise its more folklore "we did this because 
of job A failed with error...", plus some experiments with fault injection. I'd 
point to @rdblue as having put in this work too.

* Hadoop MR uses the jobID for unique temp paths, which comes from yarn and 
guaranteed to be unique within the cluster, at least until everything is 
restarted. See [Hadoop committer 
architecture](http://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/committer_architecture.html)
* And to handle job restart, has a temp ID too.

Using a temp dir and then renaming in is ~what the FileOutputCommitter v1 
algorithm does

1. task commit:  
`_temporary/$jobAttemptId/_temporary/$taskID_$taskAttemptID` -> 
`_temporary/$jobAttemptId/$taskID`
2. Job commit: list `_temporary/$jobAttemptId`, move over. This is 
sequential renaming, slow on very large jobs on HDFS &c, where it's O(files), 
performance killer on any object store where it's O(data)
3. The "v2" algorithm avoids this job commit overhead by incrementally 
committing tasks as they complete, so breaking fundamental assumptions about 
observability of output and the ability to recover from failure of tasks and 
jobs.


Adding an extra directory with another rename has some serious issues

* Completely breaks all the work Ryan and I have done with committers which 
PUT directly into place in S3, where "place" can include specific partitions 
with specific conflict resolution
* Adds *another* O(files) or O(data) rename process. So doubles the commit 
time of V1, and for v2 restores the v1 commit overhead, while at least fixing 
the task commit semantics. Essentially: it reinstates v1, just less efficiently.
* still has that problem of how to handle failure in object stores (s3, 
GCS) which don't do atomic directory rename.

Which is why I think it's the wrong solution

Normally Spark rejects work to the destination if it's already there, so 
only one job will have a temp dir. This conflict will only be an issue if 
overwrite is allowed, which is going to have other adverse consequences if 
files with the same name are ever created. If the two jobs commit 
simultaneously, you'll get a mixture of results. This is partly why the S3A 
committers insert UUIDs into their filenames by default, the other being S3's 
lack of update consistency.

Ignoring that little issue, @cloud-fan  is right: giving jobs a unique ID 
should be enough to ensure that FileOutputCommitter does all it's work in 
isolation.

Any ID known to be unique to all work actively potentially able to write to 
the same dest dir. Hadoop MR has a strict ordering requirement so that it can 
attempt to recover from job attempt failures (it looks for 
_temporary/$job_id_($job-attempt-id--/ to find committed work from the previous 
attempt).  Spark should be able to just create a UUID.

@zheh12 : welcome to the world of distributed commit protocols. My writeup 
is 
[here](https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_003/a_zero_rename_committer.pdf).
 Also check out Gil Vernik's [Stocator 
Papper](https://arxiv.org/abs/1709.01812). Start with those and the source and 
assume we've all made mistakes...

finally, regarding MAPREDUCE cleanup JIRAs, the most recent is 
[MAPREDUCE-7029](https://issues.apache.org/jira/browse/MAPREDUCE-7029). That 
includes comments from the google team on their store's behaviour.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...

2018-05-07 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21066#discussion_r186484550
  
--- Diff: 
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, 
PathOutputCommitter, PathOutputCommitterFactory}
+
+import org.apache.spark.internal.io.{FileCommitProtocol, 
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+
+/**
+ * Spark Commit protocol for Path Output Committers.
+ * This committer will work with the `FileOutputCommitter` and subclasses.
+ * All implementations *must* be serializable.
+ *
+ * Rather than ask the `FileOutputFormat` for a committer, it uses the
+ * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` 
factory
+ * API to create the committer.
+ * This is what 
[[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does,
+ * but as [[HadoopMapReduceCommitProtocol]] still uses the original
+ * `org.apache.hadoop.mapred.FileOutputFormat` binding
+ * subclasses do not do this, overrides those subclasses to using the
+ * factory mechanism now supported in the base class.
+ *
+ * In `setupCommitter` the factory is bonded to and the committer for
+ * the destination path chosen.
+ *
+ * @constructor Instantiate. dynamic partition overwrite is not supported,
+ *  so that committers for stores which do not support rename
+ *  will not get confused.
+ * @param jobId job
+ * @param destination   destination
+ * @param dynamicPartitionOverwrite does the caller want support for 
dynamic
+ *  partition overwrite. If so, it will be
+ *  refused.
+ * @throws IOException when an unsupported dynamicPartitionOverwrite 
option is supplied.
+ */
+class PathOutputCommitProtocol(
+  jobId: String,
+  destination: String,
+  dynamicPartitionOverwrite: Boolean = false)
+  extends HadoopMapReduceCommitProtocol(
+jobId,
+destination,
+false) with Serializable {
+
+  @transient var committer: PathOutputCommitter = _
+
+  require(destination != null, "Null destination specified")
+
+  val destPath = new Path(destination)
--- End diff --

I should add that `Path` is serializable in Hadoop 3 and 
[HADOOP-13519](https://issues.apache.org/jira/browse/HADOOP-13519); I've added 
a test to round trip serialization and so validate this and prevent 
regressions, and a comment to show its intentional (and that it's not something 
to copy and paste into Hadoop-2.x compatible code).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...

2018-05-07 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21066#discussion_r186482587
  
--- Diff: 
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/package.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Package object to assist in switching to the Hadoop Hadoop 3
+ * [[org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory]] 
factory
+ * mechanism for dynamically loading committers for the destination stores.
+ *
+ * = Using Alternative Committers with Spark and Hadoop 3 =
--- End diff --

yep. It is needed in the docs, with `docs/cloud-integration.md` the obvious 
place. But really that "build on Hadoop 3.1" is a prereq, isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...

2018-05-07 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21066#discussion_r186482016
  
--- Diff: 
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, 
PathOutputCommitter, PathOutputCommitterFactory}
+
+import org.apache.spark.internal.io.{FileCommitProtocol, 
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+
+/**
+ * Spark Commit protocol for Path Output Committers.
+ * This committer will work with the `FileOutputCommitter` and subclasses.
+ * All implementations *must* be serializable.
+ *
+ * Rather than ask the `FileOutputFormat` for a committer, it uses the
+ * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` 
factory
+ * API to create the committer.
+ * This is what 
[[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does,
+ * but as [[HadoopMapReduceCommitProtocol]] still uses the original
+ * `org.apache.hadoop.mapred.FileOutputFormat` binding
+ * subclasses do not do this, overrides those subclasses to using the
+ * factory mechanism now supported in the base class.
+ *
+ * In `setupCommitter` the factory is bonded to and the committer for
+ * the destination path chosen.
+ *
+ * @constructor Instantiate. dynamic partition overwrite is not supported,
+ *  so that committers for stores which do not support rename
+ *  will not get confused.
+ * @param jobId job
+ * @param destination   destination
+ * @param dynamicPartitionOverwrite does the caller want support for 
dynamic
+ *  partition overwrite. If so, it will be
+ *  refused.
+ * @throws IOException when an unsupported dynamicPartitionOverwrite 
option is supplied.
+ */
+class PathOutputCommitProtocol(
+  jobId: String,
+  destination: String,
+  dynamicPartitionOverwrite: Boolean = false)
+  extends HadoopMapReduceCommitProtocol(
+jobId,
+destination,
+false) with Serializable {
+
+  @transient var committer: PathOutputCommitter = _
+
+  require(destination != null, "Null destination specified")
+
+  val destPath = new Path(destination)
+
+  logInfo(s"Instantiated committer with job ID=$jobId;" +
+s" destination=$destPath;" +
+s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite")
+
+  if (dynamicPartitionOverwrite) {
+// until there's explicit extensions to the PathOutputCommitProtocols
+// to support the spark mechanism, it's left to the individual 
committer
+// choice to handle partitioning.
+throw new IOException("PathOutputCommitProtocol does not support 
dynamicPartitionOverwrite")
+  }
+
+  import PathOutputCommitProtocol._
+
+  /**
+   * Set up the committer.
+   * This creates it by talking directly to the Hadoop factories, instead
+   * of the V1 `mapred.FileOutputFormat` methods.
+   * @param context task attempt
+   * @return the committer to use. This will always be a subclass of
+   * [[PathOutputCommitter]].
+   */
+  override protected def setupCommitter(
+context: TaskAttemptContext): PathOutputCommitter = {
+
+logInfo(s"Setting up committer for path $destination")
+committer = PathOutputCommitterFactory.createCommitter(destPath, 
context)
+
+// Special feature to force out the FileOutputCommitter, so as to 

[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...

2018-05-07 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21066#discussion_r186475370
  
--- Diff: 
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, 
PathOutputCommitter, PathOutputCommitterFactory}
+
+import org.apache.spark.internal.io.{FileCommitProtocol, 
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+
+/**
+ * Spark Commit protocol for Path Output Committers.
+ * This committer will work with the `FileOutputCommitter` and subclasses.
+ * All implementations *must* be serializable.
+ *
+ * Rather than ask the `FileOutputFormat` for a committer, it uses the
+ * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` 
factory
+ * API to create the committer.
+ * This is what 
[[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does,
+ * but as [[HadoopMapReduceCommitProtocol]] still uses the original
+ * `org.apache.hadoop.mapred.FileOutputFormat` binding
+ * subclasses do not do this, overrides those subclasses to using the
+ * factory mechanism now supported in the base class.
+ *
+ * In `setupCommitter` the factory is bonded to and the committer for
+ * the destination path chosen.
+ *
+ * @constructor Instantiate. dynamic partition overwrite is not supported,
+ *  so that committers for stores which do not support rename
+ *  will not get confused.
+ * @param jobId job
+ * @param destination   destination
+ * @param dynamicPartitionOverwrite does the caller want support for 
dynamic
+ *  partition overwrite. If so, it will be
+ *  refused.
+ * @throws IOException when an unsupported dynamicPartitionOverwrite 
option is supplied.
+ */
+class PathOutputCommitProtocol(
+  jobId: String,
+  destination: String,
+  dynamicPartitionOverwrite: Boolean = false)
+  extends HadoopMapReduceCommitProtocol(
+jobId,
+destination,
+false) with Serializable {
+
+  @transient var committer: PathOutputCommitter = _
+
+  require(destination != null, "Null destination specified")
+
+  val destPath = new Path(destination)
+
+  logInfo(s"Instantiated committer with job ID=$jobId;" +
+s" destination=$destPath;" +
+s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite")
+
+  if (dynamicPartitionOverwrite) {
+// until there's explicit extensions to the PathOutputCommitProtocols
+// to support the spark mechanism, it's left to the individual 
committer
+// choice to handle partitioning.
+throw new IOException("PathOutputCommitProtocol does not support 
dynamicPartitionOverwrite")
+  }
+
+  import PathOutputCommitProtocol._
+
+  /**
+   * Set up the committer.
+   * This creates it by talking directly to the Hadoop factories, instead
+   * of the V1 `mapred.FileOutputFormat` methods.
+   * @param context task attempt
+   * @return the committer to use. This will always be a subclass of
+   * [[PathOutputCommitter]].
+   */
+  override protected def setupCommitter(
+context: TaskAttemptContext): PathOutputCommitter = {
+
+logInfo(s"Setting up committer for path $destination")
+committer = PathOutputCommitterFactory.createCommitter(destPath, 
context)
+
+// Special feature to force out the FileOutputCommitter, so as to 

[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...

2018-05-07 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21066#discussion_r186474730
  
--- Diff: 
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, 
PathOutputCommitter, PathOutputCommitterFactory}
+
+import org.apache.spark.internal.io.{FileCommitProtocol, 
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+
+/**
+ * Spark Commit protocol for Path Output Committers.
+ * This committer will work with the `FileOutputCommitter` and subclasses.
+ * All implementations *must* be serializable.
+ *
+ * Rather than ask the `FileOutputFormat` for a committer, it uses the
+ * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` 
factory
+ * API to create the committer.
+ * This is what 
[[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does,
+ * but as [[HadoopMapReduceCommitProtocol]] still uses the original
+ * `org.apache.hadoop.mapred.FileOutputFormat` binding
+ * subclasses do not do this, overrides those subclasses to using the
+ * factory mechanism now supported in the base class.
+ *
+ * In `setupCommitter` the factory is bonded to and the committer for
+ * the destination path chosen.
+ *
+ * @constructor Instantiate. dynamic partition overwrite is not supported,
+ *  so that committers for stores which do not support rename
+ *  will not get confused.
+ * @param jobId job
+ * @param destination   destination
+ * @param dynamicPartitionOverwrite does the caller want support for 
dynamic
+ *  partition overwrite. If so, it will be
+ *  refused.
+ * @throws IOException when an unsupported dynamicPartitionOverwrite 
option is supplied.
+ */
+class PathOutputCommitProtocol(
+  jobId: String,
+  destination: String,
+  dynamicPartitionOverwrite: Boolean = false)
+  extends HadoopMapReduceCommitProtocol(
+jobId,
+destination,
+false) with Serializable {
+
+  @transient var committer: PathOutputCommitter = _
+
+  require(destination != null, "Null destination specified")
+
+  val destPath = new Path(destination)
+
+  logInfo(s"Instantiated committer with job ID=$jobId;" +
+s" destination=$destPath;" +
+s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite")
+
+  if (dynamicPartitionOverwrite) {
+// until there's explicit extensions to the PathOutputCommitProtocols
+// to support the spark mechanism, it's left to the individual 
committer
+// choice to handle partitioning.
+throw new IOException("PathOutputCommitProtocol does not support 
dynamicPartitionOverwrite")
+  }
+
+  import PathOutputCommitProtocol._
+
+  /**
+   * Set up the committer.
+   * This creates it by talking directly to the Hadoop factories, instead
+   * of the V1 `mapred.FileOutputFormat` methods.
+   * @param context task attempt
+   * @return the committer to use. This will always be a subclass of
+   * [[PathOutputCommitter]].
+   */
+  override protected def setupCommitter(
+context: TaskAttemptContext): PathOutputCommitter = {
+
+logInfo(s"Setting up committer for path $destination")
+committer = PathOutputCommitterFactory.createCommitter(destPath, 
context)
+
+// Special feature to force out the FileOutputCommitter, so as to 

[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...

2018-05-07 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21066#discussion_r186474501
  
--- Diff: 
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, 
PathOutputCommitter, PathOutputCommitterFactory}
+
+import org.apache.spark.internal.io.{FileCommitProtocol, 
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+
+/**
+ * Spark Commit protocol for Path Output Committers.
+ * This committer will work with the `FileOutputCommitter` and subclasses.
+ * All implementations *must* be serializable.
+ *
+ * Rather than ask the `FileOutputFormat` for a committer, it uses the
+ * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` 
factory
+ * API to create the committer.
+ * This is what 
[[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does,
+ * but as [[HadoopMapReduceCommitProtocol]] still uses the original
+ * `org.apache.hadoop.mapred.FileOutputFormat` binding
+ * subclasses do not do this, overrides those subclasses to using the
+ * factory mechanism now supported in the base class.
+ *
+ * In `setupCommitter` the factory is bonded to and the committer for
+ * the destination path chosen.
+ *
+ * @constructor Instantiate. dynamic partition overwrite is not supported,
+ *  so that committers for stores which do not support rename
+ *  will not get confused.
+ * @param jobId job
+ * @param destination   destination
+ * @param dynamicPartitionOverwrite does the caller want support for 
dynamic
+ *  partition overwrite. If so, it will be
+ *  refused.
+ * @throws IOException when an unsupported dynamicPartitionOverwrite 
option is supplied.
+ */
+class PathOutputCommitProtocol(
+  jobId: String,
+  destination: String,
+  dynamicPartitionOverwrite: Boolean = false)
+  extends HadoopMapReduceCommitProtocol(
+jobId,
+destination,
+false) with Serializable {
+
+  @transient var committer: PathOutputCommitter = _
+
+  require(destination != null, "Null destination specified")
+
+  val destPath = new Path(destination)
+
+  logInfo(s"Instantiated committer with job ID=$jobId;" +
+s" destination=$destPath;" +
+s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite")
+
+  if (dynamicPartitionOverwrite) {
+// until there's explicit extensions to the PathOutputCommitProtocols
+// to support the spark mechanism, it's left to the individual 
committer
+// choice to handle partitioning.
+throw new IOException("PathOutputCommitProtocol does not support 
dynamicPartitionOverwrite")
+  }
+
+  import PathOutputCommitProtocol._
+
+  /**
+   * Set up the committer.
+   * This creates it by talking directly to the Hadoop factories, instead
+   * of the V1 `mapred.FileOutputFormat` methods.
+   * @param context task attempt
+   * @return the committer to use. This will always be a subclass of
+   * [[PathOutputCommitter]].
+   */
+  override protected def setupCommitter(
+context: TaskAttemptContext): PathOutputCommitter = {
+
+logInfo(s"Setting up committer for path $destination")
+committer = PathOutputCommitterFactory.createCommitter(destPath, 
context)
+
+// Special feature to force out the FileOutputCommitter, so as to 

[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...

2018-05-07 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/21066#discussion_r186474366
  
--- Diff: 
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, 
PathOutputCommitter, PathOutputCommitterFactory}
+
+import org.apache.spark.internal.io.{FileCommitProtocol, 
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+
+/**
+ * Spark Commit protocol for Path Output Committers.
+ * This committer will work with the `FileOutputCommitter` and subclasses.
+ * All implementations *must* be serializable.
+ *
+ * Rather than ask the `FileOutputFormat` for a committer, it uses the
+ * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` 
factory
+ * API to create the committer.
+ * This is what 
[[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does,
+ * but as [[HadoopMapReduceCommitProtocol]] still uses the original
+ * `org.apache.hadoop.mapred.FileOutputFormat` binding
+ * subclasses do not do this, overrides those subclasses to using the
+ * factory mechanism now supported in the base class.
+ *
+ * In `setupCommitter` the factory is bonded to and the committer for
+ * the destination path chosen.
+ *
+ * @constructor Instantiate. dynamic partition overwrite is not supported,
+ *  so that committers for stores which do not support rename
+ *  will not get confused.
+ * @param jobId job
+ * @param destination   destination
+ * @param dynamicPartitionOverwrite does the caller want support for 
dynamic
+ *  partition overwrite. If so, it will be
+ *  refused.
+ * @throws IOException when an unsupported dynamicPartitionOverwrite 
option is supplied.
+ */
+class PathOutputCommitProtocol(
+  jobId: String,
+  destination: String,
+  dynamicPartitionOverwrite: Boolean = false)
+  extends HadoopMapReduceCommitProtocol(
+jobId,
+destination,
+false) with Serializable {
+
+  @transient var committer: PathOutputCommitter = _
+
+  require(destination != null, "Null destination specified")
+
+  val destPath = new Path(destination)
+
+  logInfo(s"Instantiated committer with job ID=$jobId;" +
+s" destination=$destPath;" +
+s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite")
+
+  if (dynamicPartitionOverwrite) {
+// until there's explicit extensions to the PathOutputCommitProtocols
+// to support the spark mechanism, it's left to the individual 
committer
+// choice to handle partitioning.
+throw new IOException("PathOutputCommitProtocol does not support 
dynamicPartitionOverwrite")
+  }
+
+  import PathOutputCommitProtocol._
+
+  /**
+   * Set up the committer.
+   * This creates it by talking directly to the Hadoop factories, instead
+   * of the V1 `mapred.FileOutputFormat` methods.
+   * @param context task attempt
+   * @return the committer to use. This will always be a subclass of
+   * [[PathOutputCommitter]].
+   */
+  override protected def setupCommitter(
+context: TaskAttemptContext): PathOutputCommitter = {
+
+logInfo(s"Setting up committer for path $destination")
+committer = PathOutputCommitterFactory.createCommitter(destPath, 
context)
+
+// Special feature to force out the FileOutputCommitter, so as to 

  1   2   3   4   5   6   7   8   9   10   >