[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202195089 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { --- End diff -- hmm ... not sure that would be better. We have dilemma here -- cache should not know about caller and call
[GitHub] incubator-hawq pull request #1381: Remove autogeneration of version number f...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1381#discussion_r202179965 --- Diff: pxf/gradle/wrapper/gradle-wrapper.properties --- @@ -1,23 +1,6 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -#Wed Aug 05 16:07:21 PDT 2015 +#Mon Jul 09 15:02:34 PDT 2018 --- End diff -- lack of license will trip license checker, unless you add this file to exclusion list, even better to preserve the license. ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202134716 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -52,69 +62,90 @@ */ @Override public void init(FilterConfig filterConfig) throws ServletException { +//TODO: initialize cache here } /** * If user impersonation is configured, examines the request for the presense of the expected security headers * and create a proxy user to execute further request chain. Responds with an HTTP error if the header is missing * or the chain processing throws an exception. * - * @param request http request + * @param request http request * @param response http response - * @param chain filter chain + * @param chainfilter chain */ @Override -public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) throws IOException, ServletException { +public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) +throws IOException, ServletException { if (SecureLogin.isUserImpersonationEnabled()) { // retrieve user header and make sure header is present and is not empty -final String user = ((HttpServletRequest) request).getHeader(USER_HEADER); -if (user == null) { -throw new IllegalArgumentException(MISSING_HEADER_ERROR); -} else if (user.trim().isEmpty()) { -throw new IllegalArgumentException(EMPTY_HEADER_ERROR); +final String gpdbUser = getHeaderValue(request, USER_HEADER); +String transactionId = getHeaderValue(request, TRANSACTION_ID_HEADER); +Integer segmentId = getHeaderValueInt(request, SEGMENT_ID_HEADER, true); +Integer fragmentCount = getHeaderValueInt(request, FRAGMENT_COUNT_HEADER, false); +Integer fragmentIndex = getHeaderValueInt(request, FRAGMENT_INDEX_HEADER, false); + +SessionId session = new SessionId(segmentId, transactionId, gpdbUser); +if (LOG.isDebugEnabled() && fragmentCount != null) { +LOG.debug(session.toString() + " Fragment = " + fragmentIndex + " of " + fragmentCount); } // TODO refresh Kerberos token when security is enabled -// prepare pivileged action to run on behalf of proxy user +// prepare privileged action to run on behalf of proxy user PrivilegedExceptionAction action = new PrivilegedExceptionAction() { @Override public Boolean run() throws IOException, ServletException { -LOG.debug("Performing request chain call for proxy user = " + user); +LOG.debug("Performing request chain call for proxy user = " + gpdbUser); chain.doFilter(request, response); return true; } }; // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user -UserGroupInformation proxyUGI = null; +UserGroupInformation ugi = cache.getUserGroupInformation(session); try { -LOG.debug("Creating proxy user = " + user); -proxyUGI = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); -proxyUGI.doAs(action); +ugi.doAs(action); } catch (UndeclaredThrowableException ute) { // unwrap the real exception thrown by the action throw new ServletException(ute.getCause()); } catch (InterruptedException ie) { throw new ServletException(ie); } finally { -try { -if (proxyUGI != null) { -LOG.debug("Closing FileSystem for proxy user = " + proxyUGI.getUserName()); -FileSystem.closeAllForUGI(proxyUGI); -} -} catch (Throwable t) { -LOG.warn("Error closing FileSystem for proxy user = " + proxyUGI.getUserName()); -} +// Optimization to cleanup the cache if it is the last fragment +boolean forceClean = (fragmentIndex != null && fragmentIndex.equals(fragmentCount)); +
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202131530 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.r
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202132916 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.r
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202130797 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.r
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202128478 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); --- End diff -- not the best name, IMO increaseRefCount was more meaningful ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202126336 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java --- @@ -65,12 +74,15 @@ public int hashCode() { * {@inheritDoc} */ @Override -public boolean equals(Object other) { -if (!(other instanceof SessionId)) { -return false; -} -SessionId that = (SessionId) other; -return this.sessionId.equals(that.sessionId); +public boolean equals(Object obj) { +if (obj == null) return false; +if (obj == this) return true; +if (obj.getClass() != getClass()) return false; + +SessionId that = (SessionId) obj; +return new EqualsBuilder() +.append(sessionId, that.sessionId) +.isEquals(); --- End diff -- yes, agree, my previous comment about EqualsBuilder was about the pattern of comparisons for the top of the method and the helper in case there are multiple fields. Here with only one member, direct comparison should suffice. ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202127945 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); --- End diff -- dial this down to debug for production code ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202132529 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.r
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202126694 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") --- End diff -- do you still need this ? that was needed for arrays, should be ok for maps ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202129998 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); --- End diff -- rename variable to cacheEntry ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202131246 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.r
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202133845 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -52,69 +62,90 @@ */ @Override public void init(FilterConfig filterConfig) throws ServletException { +//TODO: initialize cache here } /** * If user impersonation is configured, examines the request for the presense of the expected security headers * and create a proxy user to execute further request chain. Responds with an HTTP error if the header is missing * or the chain processing throws an exception. * - * @param request http request + * @param request http request * @param response http response - * @param chain filter chain + * @param chainfilter chain */ @Override -public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) throws IOException, ServletException { +public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) +throws IOException, ServletException { if (SecureLogin.isUserImpersonationEnabled()) { // retrieve user header and make sure header is present and is not empty -final String user = ((HttpServletRequest) request).getHeader(USER_HEADER); -if (user == null) { -throw new IllegalArgumentException(MISSING_HEADER_ERROR); -} else if (user.trim().isEmpty()) { -throw new IllegalArgumentException(EMPTY_HEADER_ERROR); +final String gpdbUser = getHeaderValue(request, USER_HEADER); +String transactionId = getHeaderValue(request, TRANSACTION_ID_HEADER); +Integer segmentId = getHeaderValueInt(request, SEGMENT_ID_HEADER, true); +Integer fragmentCount = getHeaderValueInt(request, FRAGMENT_COUNT_HEADER, false); +Integer fragmentIndex = getHeaderValueInt(request, FRAGMENT_INDEX_HEADER, false); + +SessionId session = new SessionId(segmentId, transactionId, gpdbUser); +if (LOG.isDebugEnabled() && fragmentCount != null) { +LOG.debug(session.toString() + " Fragment = " + fragmentIndex + " of " + fragmentCount); } // TODO refresh Kerberos token when security is enabled -// prepare pivileged action to run on behalf of proxy user +// prepare privileged action to run on behalf of proxy user PrivilegedExceptionAction action = new PrivilegedExceptionAction() { @Override public Boolean run() throws IOException, ServletException { -LOG.debug("Performing request chain call for proxy user = " + user); +LOG.debug("Performing request chain call for proxy user = " + gpdbUser); chain.doFilter(request, response); return true; } }; // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user -UserGroupInformation proxyUGI = null; +UserGroupInformation ugi = cache.getUserGroupInformation(session); --- End diff -- I'd call this variable here proxyUGI as this is what we are after ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202133483 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -52,69 +62,90 @@ */ @Override public void init(FilterConfig filterConfig) throws ServletException { +//TODO: initialize cache here --- End diff -- please do that ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202129124 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { --- End diff -- cleanImmediatelyIfNoRefs is our interpretation of the fact that we do not expect any more requests from this session, and t
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202124776 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -61,48 +64,51 @@ * create and destroy UserGroupInformation instances. */ public UGICache() { -this(new UGIProvider()); +this(new UGIProvider(), Ticker.systemTicker()); } /** * Create new proxy UGI if not found in cache and increment reference count */ -public Entry getTimedProxyUGI(SessionId session) -throws IOException { - +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { Integer segmentId = session.getSegmentId(); String user = session.getUser(); DelayQueue delayQueue = getExpirationQueue(segmentId); synchronized (delayQueue) { // Use the opportunity to cleanup any expired entries -cleanup(segmentId); +cleanup(session); Entry entry = cache.get(session); if (entry == null) { LOG.info(session.toString() + " Creating proxy user = " + user); -entry = new Entry(ugiProvider.createProxyUGI(user), session); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); delayQueue.offer(entry); cache.put(session, entry); } entry.acquireReference(); -return entry; +return entry.getUGI(); } } /** - * Decrement reference count for the given Entry. + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. * - * @param timedProxyUGI the cache entry to release - * @param forceClean if true, destroys the UGI for the given Entry (only if it is now unreferenced). + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). */ -public void release(Entry timedProxyUGI, boolean forceClean) { +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; --- End diff -- I'd rather not have any assert statements in Java code, if that's what you mean. ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201523938 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { +private UGIProvider provider = null; +private SessionId session = null; +private UGICache cache = null; + +@Before +public void setUp() throws Exception { +provider = mock(UGIProvider.class); + + when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer) invocation -> mock(UserGroupInformation.class)); + +session = new SessionId(0, "txn-id", "the-user"); + +cache = new UGICache(provider); +} + +@Test +public void getUGIFromEmptyCache() throws Exception { +UGICacheEntry entry = cache.getTimedProxyUGI(session); +assertNotNull(entry.getUGI()); +verify(provider).createProxyUGI("the-user"); +} + +@Test +public void getSameUGITwiceUsesCache() throws Exception { +UGICacheEntry entry1 = cache.getTimedProxyUGI(session); +UGICacheEntry entry2 = cache.getTimedProxyUGI(session); +assertEquals(entry1, entry2); +assertNotNull(entry1.getUGI()); +verify(provider, times(1)).createProxyUGI("the-user"); +} + +@Test +public void getTwoUGIsWithDifferentSessionsForSameUser() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id-2", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(2)).createProxyUGI("the-user"); +// TODO: this seems weird. We're creating two UGIs with the same params, +// even though we have two different sessions. Why? +} + +@Test +public void getTwoUGIsWithDifferentUsers() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession); +UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession); +assertEquals(proxyUGI1a, proxyUGI1b); +assertEquals(proxyUGI2a, proxyUGI2b); +assertNotEquals(proxyUGI1a, proxyUGI2a); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getUGIWhenRequestedUserDoesNotExist() throws Exception { +// what does UserGroupInformation.createProxyUser() do in this scenario? +// how about getLoginUser()? +} + +@Test +public void anySegmentIdIsValid() throws Exception { +session = new SessionId(65, "txn-id", "the-user"); --- End diff -- is this relevant anymore since magic number 64 is gone now ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201525176 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { +private UGIProvider provider = null; +private SessionId session = null; +private UGICache cache = null; + +@Before +public void setUp() throws Exception { +provider = mock(UGIProvider.class); + + when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer) invocation -> mock(UserGroupInformation.class)); + +session = new SessionId(0, "txn-id", "the-user"); + +cache = new UGICache(provider); +} + +@Test +public void getUGIFromEmptyCache() throws Exception { +UGICacheEntry entry = cache.getTimedProxyUGI(session); +assertNotNull(entry.getUGI()); +verify(provider).createProxyUGI("the-user"); +} + +@Test +public void getSameUGITwiceUsesCache() throws Exception { +UGICacheEntry entry1 = cache.getTimedProxyUGI(session); +UGICacheEntry entry2 = cache.getTimedProxyUGI(session); +assertEquals(entry1, entry2); +assertNotNull(entry1.getUGI()); +verify(provider, times(1)).createProxyUGI("the-user"); +} + +@Test +public void getTwoUGIsWithDifferentSessionsForSameUser() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id-2", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(2)).createProxyUGI("the-user"); +// TODO: this seems weird. We're creating two UGIs with the same params, +// even though we have two different sessions. Why? +} + +@Test +public void getTwoUGIsWithDifferentUsers() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession); +UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession); +assertEquals(proxyUGI1a, proxyUGI1b); +assertEquals(proxyUGI2a, proxyUGI2b); +assertNotEquals(proxyUGI1a, proxyUGI2a); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getUGIWhenRequestedUserDoesNotExist() throws Exception { +// what does UserGroupInformation.createProxyUser() do in this scenario? +// how about getLoginUser()? +} + +@Test +public void anySegmentIdIsValid() throws Exception { +session = new SessionId(65, "txn-id", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +assertNotNull(proxyUGI1.getUGI()); +} + +@Test +public void releaseWithoutForceClean() throws Exception { +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); + +cache.release(proxyUGI1, false); +// UGI wasn't cleaned up, so we can still get it +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session); +asser
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201523243 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { +private UGIProvider provider = null; +private SessionId session = null; +private UGICache cache = null; + +@Before +public void setUp() throws Exception { +provider = mock(UGIProvider.class); + + when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer) invocation -> mock(UserGroupInformation.class)); + +session = new SessionId(0, "txn-id", "the-user"); + +cache = new UGICache(provider); +} + +@Test +public void getUGIFromEmptyCache() throws Exception { +UGICacheEntry entry = cache.getTimedProxyUGI(session); +assertNotNull(entry.getUGI()); +verify(provider).createProxyUGI("the-user"); +} + +@Test +public void getSameUGITwiceUsesCache() throws Exception { +UGICacheEntry entry1 = cache.getTimedProxyUGI(session); +UGICacheEntry entry2 = cache.getTimedProxyUGI(session); +assertEquals(entry1, entry2); --- End diff -- assertSame for ref comparison ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201519410 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -89,32 +106,49 @@ public Boolean run() throws IOException, ServletException { }; // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user -UserGroupInformation proxyUGI = null; +UGICacheEntry timedProxyUGI = cache.getTimedProxyUGI(session); --- End diff -- name it proxyUGIEntry ? since actual UGI is a member of that object ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201517606 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICacheEntry.java --- @@ -0,0 +1,94 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.security.UserGroupInformation; + +public class UGICacheEntry implements Delayed { + +private volatile long startTime; +private UserGroupInformation proxyUGI; +private SessionId session; +private boolean cleaned = false; +private AtomicInteger inProgress = new AtomicInteger(); +private static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes --- End diff -- we need to parameterize this ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201523091 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { +private UGIProvider provider = null; +private SessionId session = null; +private UGICache cache = null; + +@Before +public void setUp() throws Exception { +provider = mock(UGIProvider.class); + + when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer) invocation -> mock(UserGroupInformation.class)); + +session = new SessionId(0, "txn-id", "the-user"); + +cache = new UGICache(provider); +} + +@Test +public void getUGIFromEmptyCache() throws Exception { +UGICacheEntry entry = cache.getTimedProxyUGI(session); +assertNotNull(entry.getUGI()); +verify(provider).createProxyUGI("the-user"); --- End diff -- do we want to assert ref count and expiration time set as a result of such get ? Do we want to see if the principal name under proxy ugi matches the user passed in ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201512843 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java --- @@ -0,0 +1,60 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class SessionId { --- End diff -- please javadoc class and all methods ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201524673 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { +private UGIProvider provider = null; +private SessionId session = null; +private UGICache cache = null; + +@Before +public void setUp() throws Exception { +provider = mock(UGIProvider.class); + + when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer) invocation -> mock(UserGroupInformation.class)); + +session = new SessionId(0, "txn-id", "the-user"); + +cache = new UGICache(provider); +} + +@Test +public void getUGIFromEmptyCache() throws Exception { +UGICacheEntry entry = cache.getTimedProxyUGI(session); +assertNotNull(entry.getUGI()); +verify(provider).createProxyUGI("the-user"); +} + +@Test +public void getSameUGITwiceUsesCache() throws Exception { +UGICacheEntry entry1 = cache.getTimedProxyUGI(session); +UGICacheEntry entry2 = cache.getTimedProxyUGI(session); +assertEquals(entry1, entry2); +assertNotNull(entry1.getUGI()); +verify(provider, times(1)).createProxyUGI("the-user"); +} + +@Test +public void getTwoUGIsWithDifferentSessionsForSameUser() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id-2", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(2)).createProxyUGI("the-user"); +// TODO: this seems weird. We're creating two UGIs with the same params, +// even though we have two different sessions. Why? +} + +@Test +public void getTwoUGIsWithDifferentUsers() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession); +UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession); +assertEquals(proxyUGI1a, proxyUGI1b); +assertEquals(proxyUGI2a, proxyUGI2b); +assertNotEquals(proxyUGI1a, proxyUGI2a); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getUGIWhenRequestedUserDoesNotExist() throws Exception { +// what does UserGroupInformation.createProxyUser() do in this scenario? +// how about getLoginUser()? +} + +@Test +public void anySegmentIdIsValid() throws Exception { +session = new SessionId(65, "txn-id", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +assertNotNull(proxyUGI1.getUGI()); +} + +@Test +public void releaseWithoutForceClean() throws Exception { +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); + +cache.release(proxyUGI1, false); +// UGI wasn't cleaned up, so we can still get it +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session); +asser
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201520779 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -89,32 +106,49 @@ public Boolean run() throws IOException, ServletException { }; // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user -UserGroupInformation proxyUGI = null; +UGICacheEntry timedProxyUGI = cache.getTimedProxyUGI(session); try { -LOG.debug("Creating proxy user = " + user); -proxyUGI = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); -proxyUGI.doAs(action); +timedProxyUGI.getUGI().doAs(action); } catch (UndeclaredThrowableException ute) { // unwrap the real exception thrown by the action throw new ServletException(ute.getCause()); } catch (InterruptedException ie) { throw new ServletException(ie); -} finally { -try { -if (proxyUGI != null) { -LOG.debug("Closing FileSystem for proxy user = " + proxyUGI.getUserName()); -FileSystem.closeAllForUGI(proxyUGI); -} -} catch (Throwable t) { -LOG.warn("Error closing FileSystem for proxy user = " + proxyUGI.getUserName()); -} +} +finally { +// Optimization to cleanup the cache if it is the last fragment +boolean forceClean = (fragmentIndex != null && fragmentCount.equals(fragmentIndex)); +cache.release(timedProxyUGI, forceClean); } } else { // no user impersonation is configured chain.doFilter(request, response); } } + +private Integer getHeaderValueInt(ServletRequest request, String headerKey, boolean required) +throws IllegalArgumentException { +String value = getHeaderValue(request, headerKey, required); +return value != null ? Integer.valueOf(value) : null; --- End diff -- this will throw NumberFormatException ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201524234 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { +private UGIProvider provider = null; +private SessionId session = null; +private UGICache cache = null; + +@Before +public void setUp() throws Exception { +provider = mock(UGIProvider.class); + + when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer) invocation -> mock(UserGroupInformation.class)); + +session = new SessionId(0, "txn-id", "the-user"); + +cache = new UGICache(provider); +} + +@Test +public void getUGIFromEmptyCache() throws Exception { +UGICacheEntry entry = cache.getTimedProxyUGI(session); +assertNotNull(entry.getUGI()); +verify(provider).createProxyUGI("the-user"); +} + +@Test +public void getSameUGITwiceUsesCache() throws Exception { +UGICacheEntry entry1 = cache.getTimedProxyUGI(session); +UGICacheEntry entry2 = cache.getTimedProxyUGI(session); +assertEquals(entry1, entry2); +assertNotNull(entry1.getUGI()); +verify(provider, times(1)).createProxyUGI("the-user"); +} + +@Test +public void getTwoUGIsWithDifferentSessionsForSameUser() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id-2", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(2)).createProxyUGI("the-user"); +// TODO: this seems weird. We're creating two UGIs with the same params, +// even though we have two different sessions. Why? +} + +@Test +public void getTwoUGIsWithDifferentUsers() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession); +UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession); +assertEquals(proxyUGI1a, proxyUGI1b); +assertEquals(proxyUGI2a, proxyUGI2b); +assertNotEquals(proxyUGI1a, proxyUGI2a); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getUGIWhenRequestedUserDoesNotExist() throws Exception { +// what does UserGroupInformation.createProxyUser() do in this scenario? +// how about getLoginUser()? +} + +@Test +public void anySegmentIdIsValid() throws Exception { +session = new SessionId(65, "txn-id", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +assertNotNull(proxyUGI1.getUGI()); +} + +@Test +public void releaseWithoutForceClean() throws Exception { +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); + +cache.release(proxyUGI1, false); +// UGI wasn't cleaned up, so we can still get it +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session); +ass
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201515004 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,143 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; + +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private DelayQueue[] delayQueues = (DelayQueue[])new DelayQueue[64]; +private final UGIProvider ugiProvider; --- End diff -- again, only 1 final, if you bother with final members, then declare others as well, as they will not be changing ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201513628 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java --- @@ -0,0 +1,60 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class SessionId { + +private final String user; --- End diff -- why only 1 private field ? There are no setter methods for any of the other members either. ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201517214 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,143 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; + +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private DelayQueue[] delayQueues = (DelayQueue[])new DelayQueue[64]; +private final UGIProvider ugiProvider; + +public UGICache(UGIProvider provider) { +this.ugiProvider = provider; +for (int i = 0; i < delayQueues.length; i++) { +delayQueues[i] = new DelayQueue<>(); +} +} + +public UGICache() { +this(new UGIProvider()); +} + +// Create new proxy UGI if not found in cache and increment reference count +public UGICacheEntry getTimedProxyUGI(SessionId session) +throws IOException { + +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +synchronized (delayQueues[segmentId]) { +// Use the opportunity to cleanup any expired entries +cleanup(segmentId); +UGICacheEntry timedProxyUGI = cache.get(session); +if (timedProxyUGI == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +UserGroupInformation proxyUGI = ugiProvider.createProxyUGI(user); +timedProxyUGI = new UGICacheEntry(proxyUGI, session); +delayQueues[segmentId].offer(timedProxyUGI); +cache.put(session, timedProxyUGI); +} +timedProxyUGI.incrementCounter(); +return timedProxyUGI; +} +} + +// Poll segment expiration queue for all expired entries +// and clean them if possible +private void cleanup(Integer segmentId) { + +UGICacheEntry ugi = null; +while ((ugi = delayQueues[segmentId].poll()) != null) { +// Place it back in the queue if still in use and was not closed +if (!closeUGI(ugi)) { +delayQueues[segmentId].offer(ugi); +} +LOG.debug("Delay Queue Size for segment " + --- End diff -- surround with isDebugEnabled ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201517046 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,143 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; + +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private DelayQueue[] delayQueues = (DelayQueue[])new DelayQueue[64]; +private final UGIProvider ugiProvider; + +public UGICache(UGIProvider provider) { +this.ugiProvider = provider; +for (int i = 0; i < delayQueues.length; i++) { +delayQueues[i] = new DelayQueue<>(); +} +} + +public UGICache() { +this(new UGIProvider()); +} + +// Create new proxy UGI if not found in cache and increment reference count +public UGICacheEntry getTimedProxyUGI(SessionId session) +throws IOException { + +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +synchronized (delayQueues[segmentId]) { +// Use the opportunity to cleanup any expired entries +cleanup(segmentId); +UGICacheEntry timedProxyUGI = cache.get(session); +if (timedProxyUGI == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +UserGroupInformation proxyUGI = ugiProvider.createProxyUGI(user); +timedProxyUGI = new UGICacheEntry(proxyUGI, session); +delayQueues[segmentId].offer(timedProxyUGI); +cache.put(session, timedProxyUGI); +} +timedProxyUGI.incrementCounter(); +return timedProxyUGI; +} +} + +// Poll segment expiration queue for all expired entries +// and clean them if possible +private void cleanup(Integer segmentId) { --- End diff -- proper javadoc, please ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201518811 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -42,8 +47,13 @@ private static final Log LOG = LogFactory.getLog(SecurityServletFilter.class); private static final String USER_HEADER = "X-GP-USER"; -private static final String MISSING_HEADER_ERROR = String.format("Header %s is missing in the request", USER_HEADER); -private static final String EMPTY_HEADER_ERROR = String.format("Header %s is empty in the request", USER_HEADER); +private static final String SEGMENT_ID_HEADER = "X-GP-SEGMENT-ID"; +private static final String TRANSACTION_ID_HEADER = "X-GP-XID"; +private static final String FRAGMENT_INDEX_HEADER = "X-GP-FRAGMENT-INDEX"; +private static final String FRAGMENT_COUNT_HEADER = "X-GP-FRAGMENT-COUNT"; +private static final String MISSING_HEADER_ERROR = "Header %s is missing in the request"; +private static final String EMPTY_HEADER_ERROR = "Header %s is empty in the request"; +private static UGICache cache = new UGICache(); --- End diff -- this doesn't have to be static, create a single instance in the init() method ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201525708 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { +private UGIProvider provider = null; +private SessionId session = null; +private UGICache cache = null; + +@Before +public void setUp() throws Exception { +provider = mock(UGIProvider.class); + + when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer) invocation -> mock(UserGroupInformation.class)); + +session = new SessionId(0, "txn-id", "the-user"); + +cache = new UGICache(provider); +} + +@Test +public void getUGIFromEmptyCache() throws Exception { +UGICacheEntry entry = cache.getTimedProxyUGI(session); +assertNotNull(entry.getUGI()); +verify(provider).createProxyUGI("the-user"); +} + +@Test +public void getSameUGITwiceUsesCache() throws Exception { +UGICacheEntry entry1 = cache.getTimedProxyUGI(session); +UGICacheEntry entry2 = cache.getTimedProxyUGI(session); +assertEquals(entry1, entry2); +assertNotNull(entry1.getUGI()); +verify(provider, times(1)).createProxyUGI("the-user"); +} + +@Test +public void getTwoUGIsWithDifferentSessionsForSameUser() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id-2", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(2)).createProxyUGI("the-user"); +// TODO: this seems weird. We're creating two UGIs with the same params, +// even though we have two different sessions. Why? +} + +@Test +public void getTwoUGIsWithDifferentUsers() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession); +UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession); +assertEquals(proxyUGI1a, proxyUGI1b); +assertEquals(proxyUGI2a, proxyUGI2b); +assertNotEquals(proxyUGI1a, proxyUGI2a); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getUGIWhenRequestedUserDoesNotExist() throws Exception { +// what does UserGroupInformation.createProxyUser() do in this scenario? +// how about getLoginUser()? +} + +@Test +public void anySegmentIdIsValid() throws Exception { +session = new SessionId(65, "txn-id", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +assertNotNull(proxyUGI1.getUGI()); +} + +@Test +public void releaseWithoutForceClean() throws Exception { +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); + +cache.release(proxyUGI1, false); +// UGI wasn't cleaned up, so we can still get it +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session); +asser
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201521673 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,154 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; + +public UGICache(UGIProvider provider) { +this.ugiProvider = provider; +} + +public UGICache() { +this(new UGIProvider()); +} + +private DelayQueue getDelayQueue(Integer segmentId) { +DelayQueue queue = queueMap.get(segmentId); +if (queue == null) { +synchronized (queueMap) { +queue = queueMap.get(segmentId); +if (queue == null) { +queue = new DelayQueue<>(); +queueMap.put(segmentId, queue); +} +} +} +return queue; +} + +// Create new proxy UGI if not found in cache and increment reference count +public UGICacheEntry getTimedProxyUGI(SessionId session) +throws IOException { + +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getDelayQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(segmentId); +UGICacheEntry timedProxyUGI = cache.get(session); +if (timedProxyUGI == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +timedProxyUGI = new UGICacheEntry(ugiProvider.createProxyUGI(user), session); +delayQueue.offer(timedProxyUGI); +cache.put(session, timedProxyUGI); +} +timedProxyUGI.incrementCounter(); +return timedProxyUGI; +} +} + +// Poll segment expiration queue for all expired entries +// and clean them if possible +private void cleanup(Integer segmentId) { + +UGICacheEntry ugi = null; +DelayQueue delayQueue = getDelayQueue(segmentId); +while ((ugi = delayQueue.poll()) != null) { +// Place it back in the queue if still in use and was not closed +if (!closeUGI(ugi)) { +delayQueue.offer(ugi); +} +LOG.debug("Delay Queue Size for segment " + +segmentId + " = " + delayQueue.size()); --- End diff -- wrap with isDebugEnabled() ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201516439 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,143 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; + +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private DelayQueue[] delayQueues = (DelayQueue[])new DelayQueue[64]; +private final UGIProvider ugiProvider; + +public UGICache(UGIProvider provider) { +this.ugiProvider = provider; +for (int i = 0; i < delayQueues.length; i++) { +delayQueues[i] = new DelayQueue<>(); +} +} + +public UGICache() { +this(new UGIProvider()); +} + +// Create new proxy UGI if not found in cache and increment reference count +public UGICacheEntry getTimedProxyUGI(SessionId session) --- End diff -- since we return UGICacheEntry and method is called getTimedProxyUGI, it looks inconsistent ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201523390 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { +private UGIProvider provider = null; +private SessionId session = null; +private UGICache cache = null; + +@Before +public void setUp() throws Exception { +provider = mock(UGIProvider.class); + + when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer) invocation -> mock(UserGroupInformation.class)); + +session = new SessionId(0, "txn-id", "the-user"); + +cache = new UGICache(provider); +} + +@Test +public void getUGIFromEmptyCache() throws Exception { +UGICacheEntry entry = cache.getTimedProxyUGI(session); +assertNotNull(entry.getUGI()); +verify(provider).createProxyUGI("the-user"); +} + +@Test +public void getSameUGITwiceUsesCache() throws Exception { +UGICacheEntry entry1 = cache.getTimedProxyUGI(session); +UGICacheEntry entry2 = cache.getTimedProxyUGI(session); +assertEquals(entry1, entry2); --- End diff -- assert ref count == 2 ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201519133 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -64,21 +75,28 @@ public void init(FilterConfig filterConfig) throws ServletException { * @param chain filter chain */ @Override -public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) throws IOException, ServletException { +public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) +throws IOException, ServletException { if (SecureLogin.isUserImpersonationEnabled()) { // retrieve user header and make sure header is present and is not empty -final String user = ((HttpServletRequest) request).getHeader(USER_HEADER); -if (user == null) { -throw new IllegalArgumentException(MISSING_HEADER_ERROR); -} else if (user.trim().isEmpty()) { -throw new IllegalArgumentException(EMPTY_HEADER_ERROR); +final String user = getHeaderValue(request, USER_HEADER); +String transactionId = getHeaderValue(request, TRANSACTION_ID_HEADER); --- End diff -- why not mandate transaction id and segment id to be not null ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201522755 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { --- End diff -- if you use @RunWith(MockitoJUnitRunner.class) then you can use @Mock annotations, and do not have to verify if you trained the mock with specific values, not any . We discussed it with Scala, which didn't have that support. ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201519675 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -89,32 +106,49 @@ public Boolean run() throws IOException, ServletException { }; // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user -UserGroupInformation proxyUGI = null; +UGICacheEntry timedProxyUGI = cache.getTimedProxyUGI(session); try { -LOG.debug("Creating proxy user = " + user); -proxyUGI = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); -proxyUGI.doAs(action); +timedProxyUGI.getUGI().doAs(action); } catch (UndeclaredThrowableException ute) { // unwrap the real exception thrown by the action throw new ServletException(ute.getCause()); } catch (InterruptedException ie) { throw new ServletException(ie); -} finally { -try { -if (proxyUGI != null) { -LOG.debug("Closing FileSystem for proxy user = " + proxyUGI.getUserName()); -FileSystem.closeAllForUGI(proxyUGI); -} -} catch (Throwable t) { -LOG.warn("Error closing FileSystem for proxy user = " + proxyUGI.getUserName()); -} +} +finally { +// Optimization to cleanup the cache if it is the last fragment +boolean forceClean = (fragmentIndex != null && fragmentCount.equals(fragmentIndex)); +cache.release(timedProxyUGI, forceClean); --- End diff -- if release throws exception, we do not want to propagate it to user, so restore catch (Throwable t) block ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201523834 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { +private UGIProvider provider = null; +private SessionId session = null; +private UGICache cache = null; + +@Before +public void setUp() throws Exception { +provider = mock(UGIProvider.class); + + when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer) invocation -> mock(UserGroupInformation.class)); + +session = new SessionId(0, "txn-id", "the-user"); + +cache = new UGICache(provider); +} + +@Test +public void getUGIFromEmptyCache() throws Exception { +UGICacheEntry entry = cache.getTimedProxyUGI(session); +assertNotNull(entry.getUGI()); +verify(provider).createProxyUGI("the-user"); +} + +@Test +public void getSameUGITwiceUsesCache() throws Exception { +UGICacheEntry entry1 = cache.getTimedProxyUGI(session); +UGICacheEntry entry2 = cache.getTimedProxyUGI(session); +assertEquals(entry1, entry2); +assertNotNull(entry1.getUGI()); +verify(provider, times(1)).createProxyUGI("the-user"); +} + +@Test +public void getTwoUGIsWithDifferentSessionsForSameUser() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id-2", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(2)).createProxyUGI("the-user"); +// TODO: this seems weird. We're creating two UGIs with the same params, +// even though we have two different sessions. Why? +} + +@Test +public void getTwoUGIsWithDifferentUsers() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession); +UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession); +assertEquals(proxyUGI1a, proxyUGI1b); +assertEquals(proxyUGI2a, proxyUGI2b); +assertNotEquals(proxyUGI1a, proxyUGI2a); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getUGIWhenRequestedUserDoesNotExist() throws Exception { +// what does UserGroupInformation.createProxyUser() do in this scenario? +// how about getLoginUser()? --- End diff -- user existence is not required for proxy users, remove the method ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201513956 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java --- @@ -0,0 +1,60 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class SessionId { + +private final String user; +private Integer segmentId; +private String sessionId; + +public SessionId(Integer segmentId, String transactionId, String gpdbUser) { +this.segmentId = segmentId; +this.user = gpdbUser; +this.sessionId = segmentId + ":" + transactionId + ":" + gpdbUser; --- End diff -- I mentally scope in the other order: user > transaction > segment, so maybe reverse the order of components so it is easier to analyze when debugging / logging ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201516265 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,143 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; + +public class UGICache { --- End diff -- javadocs please ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201524862 --- Diff: pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java --- @@ -0,0 +1,197 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.when; + +public class UGICacheTest { +private UGIProvider provider = null; +private SessionId session = null; +private UGICache cache = null; + +@Before +public void setUp() throws Exception { +provider = mock(UGIProvider.class); + + when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer) invocation -> mock(UserGroupInformation.class)); + +session = new SessionId(0, "txn-id", "the-user"); + +cache = new UGICache(provider); +} + +@Test +public void getUGIFromEmptyCache() throws Exception { +UGICacheEntry entry = cache.getTimedProxyUGI(session); +assertNotNull(entry.getUGI()); +verify(provider).createProxyUGI("the-user"); +} + +@Test +public void getSameUGITwiceUsesCache() throws Exception { +UGICacheEntry entry1 = cache.getTimedProxyUGI(session); +UGICacheEntry entry2 = cache.getTimedProxyUGI(session); +assertEquals(entry1, entry2); +assertNotNull(entry1.getUGI()); +verify(provider, times(1)).createProxyUGI("the-user"); +} + +@Test +public void getTwoUGIsWithDifferentSessionsForSameUser() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id-2", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(2)).createProxyUGI("the-user"); +// TODO: this seems weird. We're creating two UGIs with the same params, +// even though we have two different sessions. Why? +} + +@Test +public void getTwoUGIsWithDifferentUsers() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession); +assertNotEquals(proxyUGI1, proxyUGI2); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception { +SessionId otherSession = new SessionId(0, "txn-id", "different-user"); +UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session); +UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession); +UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession); +assertEquals(proxyUGI1a, proxyUGI1b); +assertEquals(proxyUGI2a, proxyUGI2b); +assertNotEquals(proxyUGI1a, proxyUGI2a); +verify(provider, times(1)).createProxyUGI("the-user"); +verify(provider, times(1)).createProxyUGI("different-user"); +} + +@Test +public void getUGIWhenRequestedUserDoesNotExist() throws Exception { +// what does UserGroupInformation.createProxyUser() do in this scenario? +// how about getLoginUser()? +} + +@Test +public void anySegmentIdIsValid() throws Exception { +session = new SessionId(65, "txn-id", "the-user"); +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); +assertNotNull(proxyUGI1.getUGI()); +} + +@Test +public void releaseWithoutForceClean() throws Exception { +UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session); + +cache.release(proxyUGI1, false); +// UGI wasn't cleaned up, so we can still get it +UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session); +asser
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201513391 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java --- @@ -0,0 +1,60 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class SessionId { + +private final String user; +private Integer segmentId; +private String sessionId; + +public SessionId(Integer segmentId, String transactionId, String gpdbUser) { +this.segmentId = segmentId; +this.user = gpdbUser; +this.sessionId = segmentId + ":" + transactionId + ":" + gpdbUser; +} + +public Integer getSegmentId() { +return segmentId; +} + +@Override +public int hashCode() { +return sessionId.hashCode(); +} + +@Override +public boolean equals(Object other) { +if (!(other instanceof SessionId)) { +return false; +} +SessionId that = (SessionId) other; +return this.sessionId.equals(that.sessionId); --- End diff -- I usually follow the pattern and utility methods from: https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/builder/EqualsBuilder.html ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201515880 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,143 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; + +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private DelayQueue[] delayQueues = (DelayQueue[])new DelayQueue[64]; --- End diff -- can we use List backed by ArrayList -- similar performance without hacking the lack of generics support for arrays ---
[GitHub] incubator-hawq pull request #1379: WIP: Cache UGI objects and clean them per...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r199975945 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -89,32 +188,103 @@ public Boolean run() throws IOException, ServletException { }; // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user -UserGroupInformation proxyUGI = null; +TimedProxyUGI timedProxyUGI = getTimedProxyUGI(user, session); try { -LOG.debug("Creating proxy user = " + user); -proxyUGI = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); -proxyUGI.doAs(action); +timedProxyUGI.proxyUGI.doAs(action); } catch (UndeclaredThrowableException ute) { // unwrap the real exception thrown by the action throw new ServletException(ute.getCause()); } catch (InterruptedException ie) { throw new ServletException(ie); -} finally { -try { -if (proxyUGI != null) { -LOG.debug("Closing FileSystem for proxy user = " + proxyUGI.getUserName()); -FileSystem.closeAllForUGI(proxyUGI); -} -} catch (Throwable t) { -LOG.warn("Error closing FileSystem for proxy user = " + proxyUGI.getUserName()); -} +} +finally { +release(timedProxyUGI, fragmentIndex, fragmentCount); } } else { // no user impersonation is configured chain.doFilter(request, response); } } + private TimedProxyUGI getTimedProxyUGI(String user, SegmentTransactionId session) throws IOException { +synchronized (session.segmentTransactionId.intern()) { +TimedProxyUGI timedProxyUGI = cache.get(session); +if (timedProxyUGI == null || timedProxyUGI.getDelayMillis() < 0) { +cleanup(); +LOG.info(session.toString() + " Creating proxy user = " + user); +UserGroupInformation proxyUGI = +UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); +timedProxyUGI = new TimedProxyUGI(proxyUGI, session); +delayQueue.offer(timedProxyUGI); +cache.put(session, timedProxyUGI); +} else { +timedProxyUGI.incrementCounter(); +} +return timedProxyUGI; +} +} + +private void release(TimedProxyUGI timedProxyUGI, Integer fragmentIndex, Integer fragmentCount) { +synchronized (timedProxyUGI.session.segmentTransactionId.intern()) { +timedProxyUGI.resetTime(); +timedProxyUGI.decrementCounter(); +if (fragmentIndex != null && fragmentCount.equals(fragmentIndex)) +closeUGI(timedProxyUGI); +} +} + +private void cleanup() { +TimedProxyUGI timedProxyUGI = delayQueue.poll(); +while (timedProxyUGI != null) { +closeUGI(timedProxyUGI); +LOG.info(timedProxyUGI.session.toString() + " Delay Queue Size = " + delayQueue.size()); +timedProxyUGI = delayQueue.poll(); +} +} + +private void closeUGI(TimedProxyUGI timedProxyUGI) { +synchronized (timedProxyUGI.session.segmentTransactionId.intern()) { --- End diff -- this can deadlock -- if 2 cleanup attempts happen at the same time for session A and session B -- session A comes first and gets a lock on, say, "seg1:tx1" in release() method and then comes here and tries to get a lock on session B "seg2:tx1", while session B would get that loc already and will, in turn try to get lock on session A. This might happen when new requests for both sessions arrive simultaneously after a delay when their previous UGIs have expired. Granted the probability of this is low, but non-zero, meaning this is dangerous and cannot be ignored. In other words, to prevent deadlocks, do not acquire new locks while holding other locks if another thread can do the same in reverse order, which is exactly the case here. ---
[GitHub] incubator-hawq pull request #1379: WIP: Cache UGI objects and clean them per...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r199952336 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -89,32 +182,98 @@ public Boolean run() throws IOException, ServletException { }; // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user -UserGroupInformation proxyUGI = null; +TimedProxyUGI timedProxyUGI = getTimedProxyUGI(user, session); try { -LOG.debug("Creating proxy user = " + user); -proxyUGI = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); -proxyUGI.doAs(action); +timedProxyUGI.proxyUGI.doAs(action); } catch (UndeclaredThrowableException ute) { // unwrap the real exception thrown by the action throw new ServletException(ute.getCause()); } catch (InterruptedException ie) { throw new ServletException(ie); -} finally { -try { -if (proxyUGI != null) { -LOG.debug("Closing FileSystem for proxy user = " + proxyUGI.getUserName()); -FileSystem.closeAllForUGI(proxyUGI); -} -} catch (Throwable t) { -LOG.warn("Error closing FileSystem for proxy user = " + proxyUGI.getUserName()); -} +} +finally { +release(timedProxyUGI, fragmentIndex, fragmentCount); } } else { // no user impersonation is configured chain.doFilter(request, response); } } + private TimedProxyUGI getTimedProxyUGI(String user, SegmentTransactionId session) throws IOException { +synchronized (session.segmentTransactionId.intern()) { --- End diff -- synchronizing on interned strings might not be a good practice (https://stackoverflow.com/questions/133988/synchronizing-on-string-objects-in-java), basically due to: - possibilities that other logic elsewhere does it as well and creates possibility of deadlock - intern() has cost and might require internal synchronization - static pool of interned instances will grow and then GC will reclaim them, so unclear lifecycle of objects It would seem preferable to use local object we have full control over ---
[GitHub] incubator-hawq pull request #1379: WIP: Cache UGI objects and clean them per...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r199577452 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -42,8 +52,51 @@ private static final Log LOG = LogFactory.getLog(SecurityServletFilter.class); private static final String USER_HEADER = "X-GP-USER"; -private static final String MISSING_HEADER_ERROR = String.format("Header %s is missing in the request", USER_HEADER); -private static final String EMPTY_HEADER_ERROR = String.format("Header %s is empty in the request", USER_HEADER); +private static final String SEGMENT_INDEX_HEADER = "X-GP-SEGMENT-ID"; +private static final String TRANSACTION_ID_HEADER = "X-GP-XID"; +private static final String MISSING_HEADER_ERROR = "Header %s is missing in the request"; +private static final String EMPTY_HEADER_ERROR = "Header %s is empty in the request"; +private static Map cache = new ConcurrentHashMap<>(); +private static DelayQueue delayQueue = new DelayQueue<>(); +private static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + + +private static class TimedProxyUGI implements Delayed { +long startTime; +UserGroupInformation proxyUGI; +SegmentTransactionId session; + +public TimedProxyUGI(UserGroupInformation proxyUGI, SegmentTransactionId session) { +this.startTime = System.currentTimeMillis(); +this.proxyUGI = proxyUGI; +this.session = session; +} + +@Override +public long getDelay(TimeUnit unit) { +return unit.convert(getDelayMillis(), TimeUnit.MILLISECONDS); +} + +@Override +public int compareTo(Delayed other) { +return Long.compare(this.getDelayMillis(), ((TimedProxyUGI)other).getDelayMillis()); +} + +private long getDelayMillis() { +return (startTime + UGI_CACHE_EXPIRY) - System.currentTimeMillis(); +} +} + +private static class SegmentTransactionId { +String segmentIndex; --- End diff -- I think that's what we decided on the meeting, even though I don't remember why transactionID only is not enough. I believe we decided no need for close() call also. ---
[GitHub] incubator-hawq pull request #1379: WIP: Cache UGI objects and clean them per...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r199576826 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -42,8 +52,51 @@ private static final Log LOG = LogFactory.getLog(SecurityServletFilter.class); private static final String USER_HEADER = "X-GP-USER"; -private static final String MISSING_HEADER_ERROR = String.format("Header %s is missing in the request", USER_HEADER); -private static final String EMPTY_HEADER_ERROR = String.format("Header %s is empty in the request", USER_HEADER); +private static final String SEGMENT_INDEX_HEADER = "X-GP-SEGMENT-ID"; +private static final String TRANSACTION_ID_HEADER = "X-GP-XID"; +private static final String MISSING_HEADER_ERROR = "Header %s is missing in the request"; +private static final String EMPTY_HEADER_ERROR = "Header %s is empty in the request"; +private static Map cache = new ConcurrentHashMap<>(); +private static DelayQueue delayQueue = new DelayQueue<>(); +private static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes --- End diff -- I don't think we have pxf-site anymore, config properties are now managed explicitly via pxf-env.sh and are passed as Java properties via -D parameter to Tomcat. ---
[GitHub] incubator-hawq issue #1329: HAWQ-1579. fix logging issue with null Metadata
Github user denalex commented on the issue: https://github.com/apache/incubator-hawq/pull/1329 @deem0n -- as this PR has not been auto closed when the code was merged, can you please close it manually ? Thanks. ---
[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin
Github user denalex commented on the issue: https://github.com/apache/incubator-hawq/pull/1353 @sansanichfb -- can you please approve the PR if it looks good to you ? We are planning to commit it soon. ---
[GitHub] incubator-hawq issue #1344: HAWQ-1599. PXF Ignite plugin
Github user denalex commented on the issue: https://github.com/apache/incubator-hawq/pull/1344 @kapustor -- we are ready to commit it if you do not plan any further changes. Need to put it through a regression run on our side, hope to be done with it next week. Thanks. ---
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171052811 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { +private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + +// Prepared URLs to send to Ignite when reading data +private String urlReadStart = null; +private String urlReadFetch = null; +private String urlReadClose = null; +// Set to true when Ignite reported all the data for the SELECT query was retreived +private boolean isLastReadFinished = false; +// A buffer to store the SELECT query results (without Ignite metadata) +private LinkedList bufferRead = new LinkedList(); + +// A template for the INSERT +private String queryWrite = null; +// Set to true when the INSERT operation is in progress +private boolean isWriteActive = false; +// A buffer to store prepared values for the INSERT query +private LinkedList bufferWrite = new LinkedList(); + + +/** + * Class constructor. + */ +public IgniteAccessor(InputData inputData) throws UserDataException { +super(inputData); +} + +/** + * openForRead() implementation + */ +@Override +public boolean openForRead() throws Exception { +if (bufferSize == 0) { +bufferSize = 1; +} + +StringBuilder sb = new StringBuilder(); + +// Insert a list of fields to be selected +ArrayList columns = inputData.getTupleDescription(); +if (columns == null) { +throw new UserDataException("Tuple description must be present."); +} +sb.append("SELECT "); +for (int i = 0; i < columns.size(); i++) { +ColumnDescriptor column = columns.get(i); +if (i > 0) { +sb.append(","); +} +sb.append(column.columnName()); +} + +// Insert the name of the table to select values from +sb.append(" FROM "); +String tableName = inputData.getDataSource(); +if (tableName == null) { +throw new UserDataException("Table name must be set as DataSource."); +} +sb.append(tableName); + +// Insert query constraints +// Note: Filter constants may be provided separately from the qu
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171045985 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { +private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + +// Prepared URLs to send to Ignite when reading data +private String urlReadStart = null; +private String urlReadFetch = null; +private String urlReadClose = null; +// Set to true when Ignite reported all the data for the SELECT query was retreived +private boolean isLastReadFinished = false; +// A buffer to store the SELECT query results (without Ignite metadata) +private LinkedList bufferRead = new LinkedList(); + +// A template for the INSERT +private String queryWrite = null; +// Set to true when the INSERT operation is in progress +private boolean isWriteActive = false; +// A buffer to store prepared values for the INSERT query +private LinkedList bufferWrite = new LinkedList(); + + +/** + * Class constructor. + */ +public IgniteAccessor(InputData inputData) throws UserDataException { +super(inputData); +} + +/** + * openForRead() implementation + */ +@Override +public boolean openForRead() throws Exception { +if (bufferSize == 0) { +bufferSize = 1; +} + +StringBuilder sb = new StringBuilder(); + +// Insert a list of fields to be selected +ArrayList columns = inputData.getTupleDescription(); +if (columns == null) { +throw new UserDataException("Tuple description must be present."); +} +sb.append("SELECT "); +for (int i = 0; i < columns.size(); i++) { +ColumnDescriptor column = columns.get(i); +if (i > 0) { +sb.append(","); +} +sb.append(column.columnName()); +} + +// Insert the name of the table to select values from +sb.append(" FROM "); +String tableName = inputData.getDataSource(); +if (tableName == null) { +throw new UserDataException("Table name must be set as DataSource."); +} +sb.append(tableName); + +// Insert query constraints +// Note: Filter constants may be provided separately from the qu
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171054930 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { +private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + +// Prepared URLs to send to Ignite when reading data +private String urlReadStart = null; +private String urlReadFetch = null; +private String urlReadClose = null; +// Set to true when Ignite reported all the data for the SELECT query was retreived +private boolean isLastReadFinished = false; +// A buffer to store the SELECT query results (without Ignite metadata) +private LinkedList bufferRead = new LinkedList(); + +// A template for the INSERT +private String queryWrite = null; +// Set to true when the INSERT operation is in progress +private boolean isWriteActive = false; +// A buffer to store prepared values for the INSERT query +private LinkedList bufferWrite = new LinkedList(); + + +/** + * Class constructor. + */ +public IgniteAccessor(InputData inputData) throws UserDataException { +super(inputData); +} + +/** + * openForRead() implementation + */ +@Override +public boolean openForRead() throws Exception { +if (bufferSize == 0) { +bufferSize = 1; +} + +StringBuilder sb = new StringBuilder(); + +// Insert a list of fields to be selected +ArrayList columns = inputData.getTupleDescription(); +if (columns == null) { +throw new UserDataException("Tuple description must be present."); +} +sb.append("SELECT "); +for (int i = 0; i < columns.size(); i++) { +ColumnDescriptor column = columns.get(i); +if (i > 0) { +sb.append(","); +} +sb.append(column.columnName()); +} + +// Insert the name of the table to select values from +sb.append(" FROM "); +String tableName = inputData.getDataSource(); +if (tableName == null) { +throw new UserDataException("Table name must be set as DataSource."); +} +sb.append(tableName); + +// Insert query constraints +// Note: Filter constants may be provided separately from the qu
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171053664 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { +private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + +// Prepared URLs to send to Ignite when reading data +private String urlReadStart = null; +private String urlReadFetch = null; +private String urlReadClose = null; +// Set to true when Ignite reported all the data for the SELECT query was retreived +private boolean isLastReadFinished = false; +// A buffer to store the SELECT query results (without Ignite metadata) +private LinkedList bufferRead = new LinkedList(); + +// A template for the INSERT +private String queryWrite = null; +// Set to true when the INSERT operation is in progress +private boolean isWriteActive = false; +// A buffer to store prepared values for the INSERT query +private LinkedList bufferWrite = new LinkedList(); + + +/** + * Class constructor. + */ +public IgniteAccessor(InputData inputData) throws UserDataException { +super(inputData); +} + +/** + * openForRead() implementation + */ +@Override +public boolean openForRead() throws Exception { +if (bufferSize == 0) { +bufferSize = 1; +} + +StringBuilder sb = new StringBuilder(); + +// Insert a list of fields to be selected +ArrayList columns = inputData.getTupleDescription(); +if (columns == null) { +throw new UserDataException("Tuple description must be present."); +} +sb.append("SELECT "); +for (int i = 0; i < columns.size(); i++) { +ColumnDescriptor column = columns.get(i); +if (i > 0) { +sb.append(","); +} +sb.append(column.columnName()); +} + +// Insert the name of the table to select values from +sb.append(" FROM "); +String tableName = inputData.getDataSource(); +if (tableName == null) { +throw new UserDataException("Table name must be set as DataSource."); +} +sb.append(tableName); + +// Insert query constraints +// Note: Filter constants may be provided separately from the qu
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171055350 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { +private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + +// Prepared URLs to send to Ignite when reading data +private String urlReadStart = null; +private String urlReadFetch = null; +private String urlReadClose = null; +// Set to true when Ignite reported all the data for the SELECT query was retreived +private boolean isLastReadFinished = false; +// A buffer to store the SELECT query results (without Ignite metadata) +private LinkedList bufferRead = new LinkedList(); + +// A template for the INSERT +private String queryWrite = null; +// Set to true when the INSERT operation is in progress +private boolean isWriteActive = false; +// A buffer to store prepared values for the INSERT query +private LinkedList bufferWrite = new LinkedList(); + + +/** + * Class constructor. + */ +public IgniteAccessor(InputData inputData) throws UserDataException { +super(inputData); +} + +/** + * openForRead() implementation + */ +@Override +public boolean openForRead() throws Exception { +if (bufferSize == 0) { +bufferSize = 1; +} + +StringBuilder sb = new StringBuilder(); + +// Insert a list of fields to be selected +ArrayList columns = inputData.getTupleDescription(); +if (columns == null) { +throw new UserDataException("Tuple description must be present."); +} +sb.append("SELECT "); +for (int i = 0; i < columns.size(); i++) { +ColumnDescriptor column = columns.get(i); +if (i > 0) { +sb.append(","); +} +sb.append(column.columnName()); +} + +// Insert the name of the table to select values from +sb.append(" FROM "); +String tableName = inputData.getDataSource(); +if (tableName == null) { +throw new UserDataException("Table name must be set as DataSource."); +} +sb.append(tableName); + +// Insert query constraints +// Note: Filter constants may be provided separately from the qu
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171115368 --- Diff: pxf/pxf-service/src/configs/templates/pxf-private-hdp.classpath.template --- @@ -62,6 +63,8 @@ PXF_HOME/lib/pxf-jdbc.jar /usr/hdp/current/hadoop-client/client/snappy-java.jar /usr/hdp/current/hadoop-client/lib/asm-*[0-9].jar /usr/hdp/current/hadoop-client/lib/jersey-server-*[0-9].jar +/usr/hdp/current/hadoop-client/client/gson.jar + --- End diff -- extra line ? ---
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171036360 --- Diff: pxf/pxf-ignite/README.md --- @@ -0,0 +1,67 @@ +# Accessing Ignite database using PXF + +The PXF Ignite plug-in enables to access the [Ignite database](https://ignite.apache.org/) (both read and write operations are supported) via REST API. + + +# Prerequisites + +Check the following before using the plug-in: + +* The Ignite plug-in is installed on all PXF nodes; + +* The Ignite client is installed and running at the `IGNITE_HOST` (`localhost` by default; see below), and it accepts http queries from the PXF (note that *enabling Ignite REST API does not require changes in Ignite configuration*; see the instruction on how to do that at https://apacheignite.readme.io/docs/rest-api#section-getting-started). + + +# Syntax + +``` +CREATE [READABLE] EXTERNAL TABLE ( + [, , ...] | LIKE +) +LOCATION ('pxf://?PROFILE=Ignite[&&&...]') +FORMAT 'CUSTOM' (formatter='pxfwritable_import'); +``` +where each `` is one of the following: +* `IGNITE_HOST=`. The location of Ignite client node. If not given, `127.0.0.1:8080` is used by default; +* `IGNITE_CACHE=`. The name of Ignite cache to use. If not given, this parameter is not included in queries from PXF to Ignite, thus Ignite default values will be used (at the moment, this is `Default` cache). This option is **case-sensitive**; +* `BUFFER_SIZE=`. The number of tuples send to (from) Ignite per a response. The same number of tuples is stored in in-plug-in cache. The values `0` and `1` are equal (cache is not used, each tuple is passed in it's own query to Ignite). If not given, `128` is used by default; +* `PARTITION_BY=:`. See below; +* `RANGE=:`. See below; +* `INTERVAL=[:]`. See below. + + +# Partitioning +## Introduction + +PXF Ignite plugin supports simultaneous access to Ignite database from multiple PXF segments. *Partitioning* should be used in order to perform such operation. + +If the partitioning is not used, all the data will be retrieved by a single PXF segment. + +Partitioning in PXF Ignite plug-in works just like in PXF JDBC plug-in. + +This feature is optional. However, a bug in the `pxf-service` which makes partitioning necessary for any query was fixed only on 17th Jan 2018 in [this commit](https://github.com/apache/incubator-hawq/commit/0d620e431026834dd70c9e0d63edf8bb28b38227), so the older versions of PXF may return an exception if a query does not contain a meaningful `PARTITION_BY` parameter. --- End diff -- I don't think we need to include this statement, as this new plugin cannot be built with the older PXF version in the open source. ---
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171114273 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgnitePlugin.java --- @@ -0,0 +1,90 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; + +/** + * This class resolves the jdbc connection parameter and manages the opening and closing of the jdbc connection. --- End diff -- copy & paste from JDBC ? ---
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171052327 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { +private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + +// Prepared URLs to send to Ignite when reading data +private String urlReadStart = null; +private String urlReadFetch = null; +private String urlReadClose = null; +// Set to true when Ignite reported all the data for the SELECT query was retreived +private boolean isLastReadFinished = false; +// A buffer to store the SELECT query results (without Ignite metadata) +private LinkedList bufferRead = new LinkedList(); + +// A template for the INSERT +private String queryWrite = null; +// Set to true when the INSERT operation is in progress +private boolean isWriteActive = false; +// A buffer to store prepared values for the INSERT query +private LinkedList bufferWrite = new LinkedList(); + + +/** + * Class constructor. + */ +public IgniteAccessor(InputData inputData) throws UserDataException { +super(inputData); +} + +/** + * openForRead() implementation + */ +@Override +public boolean openForRead() throws Exception { +if (bufferSize == 0) { +bufferSize = 1; +} + +StringBuilder sb = new StringBuilder(); + +// Insert a list of fields to be selected +ArrayList columns = inputData.getTupleDescription(); +if (columns == null) { +throw new UserDataException("Tuple description must be present."); +} +sb.append("SELECT "); +for (int i = 0; i < columns.size(); i++) { +ColumnDescriptor column = columns.get(i); +if (i > 0) { +sb.append(","); +} +sb.append(column.columnName()); +} + +// Insert the name of the table to select values from +sb.append(" FROM "); +String tableName = inputData.getDataSource(); +if (tableName == null) { +throw new UserDataException("Table name must be set as DataSource."); +} +sb.append(tableName); + +// Insert query constraints +// Note: Filter constants may be provided separately from the qu
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171053913 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { +private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + +// Prepared URLs to send to Ignite when reading data +private String urlReadStart = null; +private String urlReadFetch = null; +private String urlReadClose = null; +// Set to true when Ignite reported all the data for the SELECT query was retreived +private boolean isLastReadFinished = false; +// A buffer to store the SELECT query results (without Ignite metadata) +private LinkedList bufferRead = new LinkedList(); + +// A template for the INSERT +private String queryWrite = null; +// Set to true when the INSERT operation is in progress +private boolean isWriteActive = false; +// A buffer to store prepared values for the INSERT query +private LinkedList bufferWrite = new LinkedList(); + + +/** + * Class constructor. + */ +public IgniteAccessor(InputData inputData) throws UserDataException { +super(inputData); +} + +/** + * openForRead() implementation + */ +@Override +public boolean openForRead() throws Exception { +if (bufferSize == 0) { +bufferSize = 1; +} + +StringBuilder sb = new StringBuilder(); + +// Insert a list of fields to be selected +ArrayList columns = inputData.getTupleDescription(); +if (columns == null) { +throw new UserDataException("Tuple description must be present."); +} +sb.append("SELECT "); +for (int i = 0; i < columns.size(); i++) { +ColumnDescriptor column = columns.get(i); +if (i > 0) { +sb.append(","); +} +sb.append(column.columnName()); +} + +// Insert the name of the table to select values from +sb.append(" FROM "); +String tableName = inputData.getDataSource(); +if (tableName == null) { +throw new UserDataException("Table name must be set as DataSource."); +} +sb.append(tableName); + +// Insert query constraints +// Note: Filter constants may be provided separately from the qu
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171042814 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { +private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + +// Prepared URLs to send to Ignite when reading data +private String urlReadStart = null; +private String urlReadFetch = null; +private String urlReadClose = null; +// Set to true when Ignite reported all the data for the SELECT query was retreived +private boolean isLastReadFinished = false; +// A buffer to store the SELECT query results (without Ignite metadata) +private LinkedList bufferRead = new LinkedList(); + +// A template for the INSERT +private String queryWrite = null; +// Set to true when the INSERT operation is in progress +private boolean isWriteActive = false; +// A buffer to store prepared values for the INSERT query +private LinkedList bufferWrite = new LinkedList(); + + +/** + * Class constructor. + */ +public IgniteAccessor(InputData inputData) throws UserDataException { +super(inputData); +} + +/** + * openForRead() implementation + */ +@Override +public boolean openForRead() throws Exception { +if (bufferSize == 0) { +bufferSize = 1; +} + +StringBuilder sb = new StringBuilder(); + +// Insert a list of fields to be selected +ArrayList columns = inputData.getTupleDescription(); +if (columns == null) { +throw new UserDataException("Tuple description must be present."); +} +sb.append("SELECT "); +for (int i = 0; i < columns.size(); i++) { +ColumnDescriptor column = columns.get(i); +if (i > 0) { +sb.append(","); +} +sb.append(column.columnName()); +} + +// Insert the name of the table to select values from +sb.append(" FROM "); +String tableName = inputData.getDataSource(); +if (tableName == null) { +throw new UserDataException("Table name must be set as DataSource."); +} +sb.append(tableName); + +// Insert query constraints +// Note: Filter constants may be provided separately from the qu
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171043644 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { +private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + +// Prepared URLs to send to Ignite when reading data +private String urlReadStart = null; +private String urlReadFetch = null; +private String urlReadClose = null; +// Set to true when Ignite reported all the data for the SELECT query was retreived +private boolean isLastReadFinished = false; +// A buffer to store the SELECT query results (without Ignite metadata) +private LinkedList bufferRead = new LinkedList(); + +// A template for the INSERT +private String queryWrite = null; +// Set to true when the INSERT operation is in progress +private boolean isWriteActive = false; +// A buffer to store prepared values for the INSERT query +private LinkedList bufferWrite = new LinkedList(); + + +/** + * Class constructor. + */ +public IgniteAccessor(InputData inputData) throws UserDataException { +super(inputData); +} + +/** + * openForRead() implementation + */ +@Override +public boolean openForRead() throws Exception { +if (bufferSize == 0) { +bufferSize = 1; +} + +StringBuilder sb = new StringBuilder(); + +// Insert a list of fields to be selected +ArrayList columns = inputData.getTupleDescription(); +if (columns == null) { +throw new UserDataException("Tuple description must be present."); +} +sb.append("SELECT "); +for (int i = 0; i < columns.size(); i++) { +ColumnDescriptor column = columns.get(i); +if (i > 0) { +sb.append(","); +} +sb.append(column.columnName()); +} + +// Insert the name of the table to select values from +sb.append(" FROM "); +String tableName = inputData.getDataSource(); +if (tableName == null) { +throw new UserDataException("Table name must be set as DataSource."); +} +sb.append(tableName); + +// Insert query constraints +// Note: Filter constants may be provided separately from the qu
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171115576 --- Diff: pxf/pxf-service/src/main/resources/pxf-profiles-default.xml --- @@ -193,4 +193,13 @@ under the License. org.apache.hawq.pxf.plugins.hdfs.ParquetResolver + +Ignite +A profile to read and write data from/to Ignite --- End diff -- should refer to it as Apache Ignite ? ---
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171055669 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { +private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + +// Prepared URLs to send to Ignite when reading data +private String urlReadStart = null; +private String urlReadFetch = null; +private String urlReadClose = null; +// Set to true when Ignite reported all the data for the SELECT query was retreived +private boolean isLastReadFinished = false; +// A buffer to store the SELECT query results (without Ignite metadata) +private LinkedList bufferRead = new LinkedList(); + +// A template for the INSERT +private String queryWrite = null; +// Set to true when the INSERT operation is in progress +private boolean isWriteActive = false; +// A buffer to store prepared values for the INSERT query +private LinkedList bufferWrite = new LinkedList(); + + +/** + * Class constructor. + */ +public IgniteAccessor(InputData inputData) throws UserDataException { +super(inputData); +} + +/** + * openForRead() implementation + */ +@Override +public boolean openForRead() throws Exception { +if (bufferSize == 0) { +bufferSize = 1; +} + +StringBuilder sb = new StringBuilder(); + +// Insert a list of fields to be selected +ArrayList columns = inputData.getTupleDescription(); +if (columns == null) { +throw new UserDataException("Tuple description must be present."); +} +sb.append("SELECT "); +for (int i = 0; i < columns.size(); i++) { +ColumnDescriptor column = columns.get(i); +if (i > 0) { +sb.append(","); +} +sb.append(column.columnName()); +} + +// Insert the name of the table to select values from +sb.append(" FROM "); +String tableName = inputData.getDataSource(); +if (tableName == null) { +throw new UserDataException("Table name must be set as DataSource."); +} +sb.append(tableName); + +// Insert query constraints +// Note: Filter constants may be provided separately from the qu
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171039740 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/ByteUtil.java --- @@ -0,0 +1,77 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * A tool class, used to deal with byte array split and conversions. + * IgnitePartitionFragmenter requires this class in order to work with 'fragmentMetadata' field + */ +public class ByteUtil { +public static byte[][] splitBytes(byte[] bytes, int n) { +int len = bytes.length / n; +byte[][] newBytes = new byte[len][]; +int j = 0; +for (int i = 0; i < len; i++) { +newBytes[i] = new byte[n]; +for (int k = 0; k < n; k++) newBytes[i][k] = bytes[j++]; +} +return newBytes; +} + +public static byte[] getBytes(long value) { --- End diff -- why not use ByteUtils from ApacheCommons ? ---
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171038185 --- Diff: pxf/pxf-ignite/README.md --- @@ -0,0 +1,67 @@ +# Accessing Ignite database using PXF + +The PXF Ignite plug-in enables to access the [Ignite database](https://ignite.apache.org/) (both read and write operations are supported) via REST API. + + +# Prerequisites + +Check the following before using the plug-in: + +* The Ignite plug-in is installed on all PXF nodes; + +* The Ignite client is installed and running at the `IGNITE_HOST` (`localhost` by default; see below), and it accepts http queries from the PXF (note that *enabling Ignite REST API does not require changes in Ignite configuration*; see the instruction on how to do that at https://apacheignite.readme.io/docs/rest-api#section-getting-started). + + +# Syntax + +``` +CREATE [READABLE] EXTERNAL TABLE ( + [, , ...] | LIKE +) +LOCATION ('pxf://?PROFILE=Ignite[&&&...]') +FORMAT 'CUSTOM' (formatter='pxfwritable_import'); +``` +where each `` is one of the following: +* `IGNITE_HOST=`. The location of Ignite client node. If not given, `127.0.0.1:8080` is used by default; +* `IGNITE_CACHE=`. The name of Ignite cache to use. If not given, this parameter is not included in queries from PXF to Ignite, thus Ignite default values will be used (at the moment, this is `Default` cache). This option is **case-sensitive**; +* `BUFFER_SIZE=`. The number of tuples send to (from) Ignite per a response. The same number of tuples is stored in in-plug-in cache. The values `0` and `1` are equal (cache is not used, each tuple is passed in it's own query to Ignite). If not given, `128` is used by default; +* `PARTITION_BY=:`. See below; +* `RANGE=:`. See below; +* `INTERVAL=[:]`. See below. + + +# Partitioning +## Introduction + +PXF Ignite plugin supports simultaneous access to Ignite database from multiple PXF segments. *Partitioning* should be used in order to perform such operation. + +If the partitioning is not used, all the data will be retrieved by a single PXF segment. + +Partitioning in PXF Ignite plug-in works just like in PXF JDBC plug-in. + +This feature is optional. However, a bug in the `pxf-service` which makes partitioning necessary for any query was fixed only on 17th Jan 2018 in [this commit](https://github.com/apache/incubator-hawq/commit/0d620e431026834dd70c9e0d63edf8bb28b38227), so the older versions of PXF may return an exception if a query does not contain a meaningful `PARTITION_BY` parameter. + + +## Syntax + +To use partitions, add a set of ``s: +``` +_BY=:=:[=[:]] +``` + +* The `PARTITION_BY` parameter indicates which column to use as the partition column. Only one column can be used as a partition column. +* The `` is the name of a partition column; +* The `` is the datatype of a partition column. At the moment, the **supported types** are `INT`, `DATE` and `ENUM`. The `DATE` format is `-MM-dd`. + +* The `RANGE` parameter indicates the range of data to be queried. It is left-closed, thus it produces ranges like: +* `[ ; )`, +* `... >= start_value AND ... < end_value`; --- End diff -- would the logic automatically add [-infinity:start_value) and [end_value:infinity) ranges ? If not, how will the data that falls into these intervals be retrieved ? ---
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171114496 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgnitePlugin.java --- @@ -0,0 +1,90 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; + +/** + * This class resolves the jdbc connection parameter and manages the opening and closing of the jdbc connection. + * Implemented subclasses: {@link IgniteReadAccessor}. + */ +public class IgnitePlugin extends Plugin { +private static final Log LOG = LogFactory.getLog(IgnitePlugin.class); + +// Ignite cache +protected static final String igniteHostDefault = "127.0.0.1:8080"; +protected String igniteHost = null; +// PXF buffer for Ignite data. '0' is allowed for INSERT queries +protected static final int bufferSizeDefault = 128; +protected int bufferSize = bufferSizeDefault; +// Ignite cache name +protected String cacheName = null; + +/** + * Parse and check the InputData + * @param inputData + * @throws UserDataException if the request parameter is malformed + */ +public IgnitePlugin(InputData inputData) throws UserDataException { +super(inputData); +if (LOG.isDebugEnabled()) { +LOG.debug("Constructor started"); +} + +igniteHost = inputData.getUserProperty("IGNITE_HOST"); +if (igniteHost == null) { +igniteHost = igniteHostDefault; +} + +cacheName = inputData.getUserProperty("IGNITE_CACHE"); +if (cacheName == null) { --- End diff -- remove this ? ---
[GitHub] incubator-hawq pull request #1344: PXF Ignite plugin
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171036634 --- Diff: pxf/pxf-ignite/README.md --- @@ -0,0 +1,67 @@ +# Accessing Ignite database using PXF + +The PXF Ignite plug-in enables to access the [Ignite database](https://ignite.apache.org/) (both read and write operations are supported) via REST API. + + +# Prerequisites + +Check the following before using the plug-in: + +* The Ignite plug-in is installed on all PXF nodes; + +* The Ignite client is installed and running at the `IGNITE_HOST` (`localhost` by default; see below), and it accepts http queries from the PXF (note that *enabling Ignite REST API does not require changes in Ignite configuration*; see the instruction on how to do that at https://apacheignite.readme.io/docs/rest-api#section-getting-started). + + +# Syntax + +``` +CREATE [READABLE] EXTERNAL TABLE ( + [, , ...] | LIKE +) +LOCATION ('pxf://?PROFILE=Ignite[&&&...]') +FORMAT 'CUSTOM' (formatter='pxfwritable_import'); +``` +where each `` is one of the following: +* `IGNITE_HOST=`. The location of Ignite client node. If not given, `127.0.0.1:8080` is used by default; +* `IGNITE_CACHE=`. The name of Ignite cache to use. If not given, this parameter is not included in queries from PXF to Ignite, thus Ignite default values will be used (at the moment, this is `Default` cache). This option is **case-sensitive**; +* `BUFFER_SIZE=`. The number of tuples send to (from) Ignite per a response. The same number of tuples is stored in in-plug-in cache. The values `0` and `1` are equal (cache is not used, each tuple is passed in it's own query to Ignite). If not given, `128` is used by default; +* `PARTITION_BY=:`. See below; +* `RANGE=:`. See below; +* `INTERVAL=[:]`. See below. + + +# Partitioning +## Introduction + +PXF Ignite plugin supports simultaneous access to Ignite database from multiple PXF segments. *Partitioning* should be used in order to perform such operation. + +If the partitioning is not used, all the data will be retrieved by a single PXF segment. + +Partitioning in PXF Ignite plug-in works just like in PXF JDBC plug-in. --- End diff -- since this is a separate plugin and JDBC partitioning may change independently, it would be good to describe how it woks for Ignite here. ---
[GitHub] incubator-hawq pull request #1339: HAWQ-1036. Implement user impersonation i...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1339#discussion_r167117297 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java --- @@ -356,6 +361,20 @@ public float getStatsSampleRatio() { return statsSampleRatio; } +private void parseSecurityProperties() { +// obtain identity of the end-user -- mandatory only when impersonation is enabled +if (SecureLogin.isUserImpersonationEnabled()) { +this.user = getProperty("USER"); +} else { +this.user = getOptionalProperty("USER"); --- End diff -- the property is part of the InputData, so we fill it up anyways. Disabling impersonation disables running the request in the SecurityContext of the user, but user info might be important for some other things (like logging). ---
[GitHub] incubator-hawq pull request #1339: HAWQ-1036. Implement user impersonation i...
GitHub user denalex opened a pull request: https://github.com/apache/incubator-hawq/pull/1339 HAWQ-1036. Implement user impersonation in PXF You can merge this pull request into a Git repository by running: $ git pull https://github.com/denalex/incubator-hawq pxf-user Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1339.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1339 commit d21091bbc8dae125690c0a48cf25708ffba004fb Author: Alexander Denissov <adenissov@...> Date: 2017-12-08T01:17:32Z HAWQ-1036. Implement user impersonation in PXF commit 83c9b660911c3fb2de56fa2559021d7fd700c6cb Author: Alexander Denissov <adenissov@...> Date: 2017-12-13T20:13:11Z substitute protocol version during build commit 4d711b32acc61c53b25041ed7bc0a1354c5bb0d0 Author: Alexander Denissov <adenissov@...> Date: 2017-12-13T20:52:58Z declared variables as final commit 9df7b824ff89f934d84a61aa230b04694cc4ea96 Author: Alexander Denissov <adenissov@...> Date: 2017-12-14T23:04:25Z added parameter to the test commit f6979bd5a06949f04ebd569fe60b9039b1058956 Author: Alexander Denissov <adenissov@...> Date: 2017-12-20T23:02:27Z propagate exception ---
[GitHub] incubator-hawq pull request #1334: HAWQ-1584. Don't ignore exceptions during...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1334#discussion_r166099466 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java --- @@ -143,36 +142,38 @@ private static synchronized Response synchronizedWriteResponse(Bridge bridge, private static Response writeResponse(Bridge bridge, String path, - InputStream inputStream) throws Exception { - -String returnMsg; - + InputStream inputStream) +throws Exception { // Open the output file bridge.beginIteration(); - long totalWritten = 0; +Exception ex = null; // dataStream will close automatically in the end of the try. // inputStream is closed by dataStream.close(). try (DataInputStream dataStream = new DataInputStream(inputStream)) { while (bridge.setNext(dataStream)) { ++totalWritten; } -} catch (ClientAbortException e) { -LOG.debug("Remote connection closed by HAWQ", e); -} catch (Exception ex) { -LOG.debug("totalWritten so far " + totalWritten + " to " + path); -throw ex; +} catch (ClientAbortException cae) { +LOG.error("Remote connection closed by HAWQ", cae); +} catch (Exception e) { +LOG.error("Exception: totalWritten so far " + totalWritten + " to " + path, e); +ex = e; } finally { try { bridge.endIteration(); } catch (Exception e) { -// ignore ... any significant errors should already have been handled +if (ex == null) +ex = e; --- End diff -- Another way would be to preserve throwing the exception inside original catch block (line 162), then here you would say ``` if (ex == null) throw e; else throw ex; ``` and you would not need the block below (lines 170-172) as the original will still be thrown if endIterations() completes without an error. ---
[GitHub] incubator-hawq pull request #1334: Don't ignore exceptions during bridge.end...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1334#discussion_r164928664 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java --- @@ -159,20 +159,16 @@ private static Response writeResponse(Bridge bridge, ++totalWritten; } } catch (ClientAbortException e) { -LOG.debug("Remote connection closed by HAWQ", e); +LOG.error("Remote connection closed by HAWQ", e); } catch (Exception ex) { -LOG.debug("totalWritten so far " + totalWritten + " to " + path); +LOG.error("totalWritten so far " + totalWritten + " to " + path); throw ex; } finally { -try { -bridge.endIteration(); -} catch (Exception e) { -// ignore ... any significant errors should already have been handled -} +bridge.endIteration(); --- End diff -- I do not see exception being logged: `LOG.error("totalWritten so far " + totalWritten + " to " + path);` and only the latest one will be returned to client, not the original which potentially caused the issue ---
[GitHub] incubator-hawq pull request #1334: Don't ignore exceptions during bridge.end...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1334#discussion_r164913643 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java --- @@ -159,20 +159,16 @@ private static Response writeResponse(Bridge bridge, ++totalWritten; } } catch (ClientAbortException e) { -LOG.debug("Remote connection closed by HAWQ", e); +LOG.error("Remote connection closed by HAWQ", e); } catch (Exception ex) { -LOG.debug("totalWritten so far " + totalWritten + " to " + path); +LOG.error("totalWritten so far " + totalWritten + " to " + path); throw ex; } finally { -try { -bridge.endIteration(); -} catch (Exception e) { -// ignore ... any significant errors should already have been handled -} +bridge.endIteration(); --- End diff -- if Exception happened in try and is rethrown by catch and also endIteration throws an exception, then the original one will be forgotten and the latest one form endIteration will be thrown. If this is ok, we should at least log at error level the original one in catch block. Alternatively, have an error flag set by catch and then ignore exception here only if the flag is set to have the original one propagate to the client. ---
[GitHub] incubator-hawq pull request #1334: Don't ignore exceptions during bridge.end...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1334#discussion_r164912937 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java --- @@ -159,20 +159,16 @@ private static Response writeResponse(Bridge bridge, ++totalWritten; } } catch (ClientAbortException e) { -LOG.debug("Remote connection closed by HAWQ", e); +LOG.error("Remote connection closed by HAWQ", e); --- End diff -- this was not considered an error, for some reason, we are not throwing exception in this case, so maybe warning level ? ---
[GitHub] incubator-hawq pull request #1332: HAWQ-1581. Separate PXF system parameters...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1332#discussion_r163696192 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java --- @@ -82,19 +83,26 @@ public ProtocolData(Map<String, String> paramsMap) { parseTupleDescription(); /* - * accessor - will throw exception from getPropery() if outputFormat is + * accessor - will throw exception if outputFormat is * BINARY and the user did not supply accessor=... or profile=... - * resolver - will throw exception from getPropery() if outputFormat is + * resolver - will throw exception if outputFormat is * BINARY and the user did not supply resolver=... or profile=... */ -profile = getOptionalProperty("PROFILE"); +profile = getUserProperty("PROFILE"); if (profile != null) { setProfilePlugins(); } -accessor = getProperty("ACCESSOR"); -resolver = getProperty("RESOLVER"); -fragmenter = getOptionalProperty("FRAGMENTER"); -metadata = getOptionalProperty("METADATA"); +accessor = getUserProperty("ACCESSOR"); +if(accessor == null) { +protocolViolation(accessor); --- End diff -- what's the point of sending null to protocolViolation function ? would you want to send string instead ? ---
[GitHub] incubator-hawq pull request #1332: HAWQ-1581. Separate PXF system parameters...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1332#discussion_r163695957 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java --- @@ -82,19 +83,26 @@ public ProtocolData(Map<String, String> paramsMap) { parseTupleDescription(); /* - * accessor - will throw exception from getPropery() if outputFormat is + * accessor - will throw exception if outputFormat is * BINARY and the user did not supply accessor=... or profile=... - * resolver - will throw exception from getPropery() if outputFormat is + * resolver - will throw exception if outputFormat is * BINARY and the user did not supply resolver=... or profile=... */ -profile = getOptionalProperty("PROFILE"); +profile = getUserProperty("PROFILE"); --- End diff -- but you repeat the same code multiple times here -- throwing error if the property is not present, which makes it a mandatory "user property". ---
[GitHub] incubator-hawq pull request #1332: HAWQ-1581. Separate PXF system parameters...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1332#discussion_r163433718 --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java --- @@ -58,8 +58,8 @@ public JdbcPlugin(InputData input) throws UserDataException { super(input); jdbcDriver = input.getUserProperty("JDBC_DRIVER"); dbUrl = input.getUserProperty("DB_URL"); -user = input.getUserProperty("USER"); -pass = input.getUserProperty("PASS"); +user = input.getUserProperty("DB_USER"); +pass = input.getUserProperty("DB_PASS"); --- End diff -- no longer necessary to change these property names ---
[GitHub] incubator-hawq pull request #1332: HAWQ-1581. Separate PXF system parameters...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1332#discussion_r163433442 --- Diff: pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/ProfilesConfTest.java --- @@ -96,16 +96,18 @@ public void definedProfile() throws Exception { optionalFile.toURI().toURL()); Map<String, String> hbaseProfile = ProfilesConf.getProfilePluginsMap("HBase"); +System.out.println(hbaseProfile); +System.out.println(hbaseProfile.get("X-GP-OPTIONS-PLUGIN1")); --- End diff -- did you intend to have System.out here ? ---
[GitHub] incubator-hawq pull request #1332: HAWQ-1581. Separate PXF system parameters...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1332#discussion_r163434200 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java --- @@ -82,19 +83,26 @@ public ProtocolData(Map<String, String> paramsMap) { parseTupleDescription(); /* - * accessor - will throw exception from getPropery() if outputFormat is + * accessor - will throw exception if outputFormat is * BINARY and the user did not supply accessor=... or profile=... - * resolver - will throw exception from getPropery() if outputFormat is + * resolver - will throw exception if outputFormat is * BINARY and the user did not supply resolver=... or profile=... */ -profile = getOptionalProperty("PROFILE"); +profile = getUserProperty("PROFILE"); --- End diff -- should we preserve semantic of getUserProperty() that throws exception if the property is not present and getUserOptionalProperty() that returns null ? ---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1326#discussion_r159308210 --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java --- @@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) { * @param fsp file split to be serialized * @return byte serialization of fsp * @throws IOException if I/O errors occur while writing to the underlying - * stream + * stream */ public static byte[] prepareFragmentMetadata(FileSplit fsp) throws IOException { -ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream(); -ObjectOutputStream objectStream = new ObjectOutputStream( -byteArrayStream); -objectStream.writeLong(fsp.getStart()); -objectStream.writeLong(fsp.getLength()); -objectStream.writeObject(fsp.getLocations()); + +return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations()); + +} + +public static byte[] prepareFragmentMetadata(long start, long length, String[] locations) --- End diff -- or better to incorporate 2 lines from this function into the parent function, if it only is used once. ---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1326#discussion_r159306278 --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java --- @@ -0,0 +1,103 @@ +package org.apache.hawq.pxf.plugins.hdfs; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ParquetDataFragmenter extends Fragmenter { +private Job job; + +public ParquetDataFragmenter(InputData md) { +super(md); +JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class); +try { +job = Job.getInstance(jobConf); +} catch (IOException e) { +throw new RuntimeException("Unable to instantiate a job for reading fragments", e); +} +} + + +@Override +public List getFragments() throws Exception { +String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource()); +ArrayList splits = getSplits(new Path(absoluteDataPath)); --- End diff -- usually best to declare type as List, especially since there is no direct access calls that would justify narrowing this to ArrayList ---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1326#discussion_r159308579 --- Diff: pxf/pxf-service/src/scripts/pxf-env.sh --- @@ -54,3 +54,5 @@ export HADOOP_DISTRO=${HADOOP_DISTRO} # Parent directory of Hadoop client installation (optional) # used in case of tarball-based installation when all clients are under a common parent directory export HADOOP_ROOT=${HADOOP_ROOT} + +export CATALINA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" --- End diff -- this is for debugging, should not be committed ! ---
[GitHub] incubator-hawq pull request #1322: Support Hive OpenCSVSerde
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1322#discussion_r156481646 --- Diff: pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java --- @@ -43,7 +43,7 @@ public HiveLineBreakAccessor(InputData input) throws Exception { super(input, new TextInputFormat()); ((TextInputFormat) inputFormat).configure(jobConf); -HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE); +HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE, PXF_HIVE_SERDES.CSV_SERDE); --- End diff -- why do we need to keep a list of supported Serde ? If Serde is transparent to us, let support all of them and not restrict the users to use specific ones. ---
[GitHub] incubator-hawq issue #1319: HAWQ-1563. Adapt to run Makefile in /bin/sh
Github user denalex commented on the issue: https://github.com/apache/incubator-hawq/pull/1319 Committed: https://github.com/apache/incubator-hawq/commit/9578ab04c80d0d0cc1d93e8a8cde98f9e14b6e7d, closing PR ---
[GitHub] incubator-hawq pull request #1306: HAWQ-1543. Make pxf configurable upon res...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1306#discussion_r148685296 --- Diff: pxf/pxf-service/src/scripts/pxf-env.sh --- @@ -41,6 +41,9 @@ fi # Port export PXF_PORT=${PXF_PORT:-51200} +# Memory +export PXF_HEAP_OPTS="-Xmx2g -Xms1g" --- End diff -- I would call these PXF_JVM_OPTS since there can be other options here (like GC setting, etc) ---
[GitHub] incubator-hawq pull request #1306: HAWQ-1543. Make pxf configurable upon res...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1306#discussion_r148685123 --- Diff: pxf/pxf-service/src/scripts/pxf-service --- @@ -249,6 +206,34 @@ function patchWebapp() cat $web_file | \ sed "s:.*pxf-log4j.properties<\/param-value>:$PXF_HOME\/conf\/pxf-log4j.properties<\/param-value>:" > web.xml.tmp mv web.xml.tmp $web_file + +# set port + catalinaProperties=$instance_root/$instance_name/conf/catalina.properties +cat $catalinaProperties | \ +sed "s|^[[:blank:]]*connector.http.port=.*$|connector.http.port=$instance_port|g" \ +> ${catalinaProperties}.tmp + +rm $catalinaProperties +mv ${catalinaProperties}.tmp $catalinaProperties --- End diff -- can we just copy from a template and "sed" in place instead of all these file manipulations ? ---
[GitHub] incubator-hawq pull request #1306: HAWQ-1543. Make pxf configurable upon res...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1306#discussion_r148684991 --- Diff: pxf/pxf-service/src/scripts/pxf-service --- @@ -216,17 +174,16 @@ function doInit() determineHadoopDistro generatePrivateClasspath || return 1 createInstance || return 1 - configureInstance || return 1 deployWebapp || return 1 createLogsDir || return 1 createRunDir || return 1 } # -# patchWebapp patches the webapp config files -# patch applied only if PXF_HOME is defined +# configureWebapp patches the webapp with pxf and user overriden configs +# applied only if PXF_HOME is defined # -function patchWebapp() +function configureWebapp() --- End diff -- since this function is now called on every start, we should not be unpacking WAR file and edit web.xml -- only catalina files that use dynamic properties should be regenerated. Ideally, they should be regenerated from templates that do not get destroyed, so that customers have a chance to change the template, if needed. ---
[GitHub] incubator-hawq pull request #1305: HAWQ-1542. PXF Demo profile should suppor...
Github user denalex closed the pull request at: https://github.com/apache/incubator-hawq/pull/1305 ---
[GitHub] incubator-hawq issue #1305: HAWQ-1542. PXF Demo profile should support write...
Github user denalex commented on the issue: https://github.com/apache/incubator-hawq/pull/1305 Merged: https://github.com/apache/incubator-hawq/commit/fe33faaba9e4f0422240014c415d1b6a999db5f8 ---
[GitHub] incubator-hawq pull request #1305: HAWQ-1542. PXF Demo profile should suppor...
GitHub user denalex opened a pull request: https://github.com/apache/incubator-hawq/pull/1305 HAWQ-1542. PXF Demo profile should support write use case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/denalex/incubator-hawq pxf-demo-write Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1305.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1305 commit c619c57830e3c2cef0832180c1878290101183cd Author: Alexander Denissov <adenis...@pivotal.io> Date: 2017-10-26T18:44:10Z initial version of DemoWritable commit dea31e9980b738c3f69de30eae3d8f9fd7524de0 Author: Alexander Denissov <adenis...@pivotal.io> Date: 2017-10-30T21:42:17Z writeable logic ---
[GitHub] incubator-hawq pull request #1304: HAWQ-1541. PXF configs shouldn't be part ...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1304#discussion_r147462998 --- Diff: pxf/build.gradle --- @@ -197,14 +197,9 @@ project('pxf-service') { tasks.war { archiveName = 'pxf.war' processResources { -filesMatching('**/pxf-private*.classpath') { +filesMatching('**/pxf-*') { --- End diff -- so we are not shipping any resources within WAR, no default profiles, classpath files or log4j, correct ? And we sure make install or RPM packaging places the needed files into proper places ? ---
[GitHub] incubator-hawq pull request #1304: HAWQ-1541. PXF configs shouldn't be part ...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1304#discussion_r147467878 --- Diff: pxf/Makefile --- @@ -19,13 +19,13 @@ default: all ifneq "$(HD)" "" -BUILD_PARAMS= -Dhd=$(HD) +BUILD_PARAMS+= -Dhd=$(HD) else ifneq "$(PXF_HOME)" "" -BUILD_PARAMS= -DdeployPath="$(PXF_HOME)" +BUILD_PARAMS+= -DdeployPath="$(PXF_HOME)" else ifneq "$(GPHOME)" "" PXF_HOME= "$(GPHOME)/pxf" -BUILD_PARAMS= -DdeployPath="$(PXF_HOME)" +BUILD_PARAMS+= -DdeployPath="$(PXF_HOME)" --- End diff -- minor ;) -- you can move the BUILD_PARAMS line outside of if/else since it's the same line in both places. ---
[GitHub] incubator-hawq issue #1303: HAWQ-1540. PXF should not accept parameters for ...
Github user denalex commented on the issue: https://github.com/apache/incubator-hawq/pull/1303 Committed: commit d2d3eaf508ef0aabcca28b73faa910f343b392fc Author: Alexander Denissov <adenis...@pivotal.io> Date: Wed Oct 18 13:57:03 2017 -0700 HAWQ-1540. PXF should not accept parameters for init command ---
[GitHub] incubator-hawq pull request #1303: HAWQ-1540. PXF should not accept paramete...
Github user denalex closed the pull request at: https://github.com/apache/incubator-hawq/pull/1303 ---
[GitHub] incubator-hawq pull request #1303: HAWQ-1540. PXF should not accept paramete...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1303#discussion_r146398562 --- Diff: pxf/pxf-service/src/configs/templates/pxf-private-hdp.classpath.template --- @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## +# This file contains the internal classpaths required to run PXF. --- End diff -- fixed ---
[GitHub] incubator-hawq pull request #1303: HAWQ-1540. PXF should not accept paramete...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1303#discussion_r146398591 --- Diff: pxf/pxf-service/src/configs/templates/pxf-private-cdh.classpath.template --- @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## +# This file contains the internal classpaths required to run PXF. --- End diff -- fixed ---
[GitHub] incubator-hawq pull request #1303: HAWQ-1540. PXF should not accept paramete...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1303#discussion_r146398533 --- Diff: pxf/pxf-service/src/configs/templates/pxf-private-tar.classpath.template --- @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## +# This file contains the internal classpaths required to run PXF. --- End diff -- fixed ---
[GitHub] incubator-hawq pull request #1303: HAWQ-1540. PXF should not accept paramete...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1303#discussion_r146362612 --- Diff: pxf/pxf-service/src/scripts/pxf-service --- @@ -292,38 +288,90 @@ function createRunDir() return 0 } +function check_hadoop_install() +{ +local distro_type=${1} +case "${distro_type}" in +hdp|HDP) +if [ -d "/usr/hdp/current/hadoop-client/client" ]; then +DISTRO="hdp" +return 0; +fi +;; +cdh|CDH) +if [ -d "/usr/lib/hadoop/client" ]; then +DISTRO="cdh" +return 0; +fi +;; +tar|TAR) +if [ -n "${HADOOP_ROOT}" ] && [ -d "${HADOOP_ROOT}/hadoop/share/hadoop/common/lib" ]; then +DISTRO="tar" +return 0; +fi +;; +custom|CUSTOM) +# use tarball template for custom distro, do not require HADOOP_ROOT to be set +DISTRO="tar" +return 0; +;; +*) +fail "Unknown hadoop distribution type: HADOOP_DISTRO=${distro_type}" +;; +esac +# the distro type was not found installed, return failure code --- End diff -- good point, I wanted to do that too, but this function is called repeatedly from the logic below and in some cases it might go via HDP / CDH/ TAR calls consecutively, so erring out is not an option here and I believe informing user about failures of intermediate steps would be confusing. ---