http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java new file mode 100644 index 0000000..131bcde --- /dev/null +++ b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java @@ -0,0 +1,1176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.attribute.expression.language; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.apache.nifi.attribute.expression.language.Query.Range; +import org.apache.nifi.attribute.expression.language.evaluation.QueryResult; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException; +import org.apache.nifi.expression.AttributeExpression.ResultType; +import org.apache.nifi.flowfile.FlowFile; +import org.antlr.runtime.tree.Tree; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestQuery { + + @Test + public void testCompilation() { + assertInvalid("${attr:uuid()}"); + assertInvalid("${attr:indexOf(length())}"); + assertValid("${UUID()}"); + assertInvalid("${UUID():nextInt()}"); + assertValid("${nextInt()}"); + assertValid("${now():format('yyyy/MM/dd')}"); + assertInvalid("${attr:times(3)}"); + assertValid("${attr:toNumber():multiply(3)}"); + assertValid("${hostname()}"); + assertValid("${literal(3)}"); + // left here because it's convenient for looking at the output + //System.out.println(Query.compile("").evaluate(null)); + } + + private void assertValid(final String query) { + try { + Query.compile(query); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail("Expected query to be valid, but it failed to compile due to " + e); + } + } + + private void assertInvalid(final String query) { + try { + Query.compile(query); + Assert.fail("Expected query to be invalid, but it did compile"); + } catch (final Exception e) { + } + } + + @Test + public void testIsValidExpression() { + Query.validateExpression("${abc:substring(${xyz:length()})}", false); + Query.isValidExpression("${now():format('yyyy-MM-dd')}"); + + try { + Query.validateExpression("$${attr}", false); + Assert.fail("invalid query validated"); + } catch (final AttributeExpressionLanguageParsingException e) { + } + + Query.validateExpression("$${attr}", true); + + Query.validateExpression("${filename:startsWith('T8MTXBC')\n" + + ":or( ${filename:startsWith('C4QXABC')} )\n" + + ":or( ${filename:startsWith('U6CXEBC')} )" + + ":or( ${filename:startsWith('KYM3ABC')} )}", false); + } + + @Test + public void testCompileEmbedded() { + final String expression = "${x:equals( ${y} )}"; + final Query query = Query.compile(expression); + final Tree tree = query.getTree(); + System.out.println(printTree(tree)); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("x", "x"); + attributes.put("y", "x"); + final String result = Query.evaluateExpressions(expression, attributes, null); + assertEquals("true", result); + + Query.validateExpression(expression, false); + } + + private String printTree(final Tree tree) { + final StringBuilder sb = new StringBuilder(); + printTree(tree, 0, sb); + + return sb.toString(); + } + + private void printTree(final Tree tree, final int spaces, final StringBuilder sb) { + for (int i = 0; i < spaces; i++) { + sb.append(" "); + } + + if (tree.getText().trim().isEmpty()) { + sb.append(tree.toString()).append("\n"); + } else { + sb.append(tree.getText()).append("\n"); + } + + for (int i = 0; i < tree.getChildCount(); i++) { + printTree(tree.getChild(i), spaces + 2, sb); + } + } + + @Test + public void testEscape() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "My Value"); + attributes.put("${xx}", "hello"); + + assertEquals("My Value", evaluateQueryForEscape("${attr}", attributes)); + assertEquals("${attr}", evaluateQueryForEscape("$${attr}", attributes)); + assertEquals("$My Value", evaluateQueryForEscape("$$${attr}", attributes)); + assertEquals("$${attr}", evaluateQueryForEscape("$$$${attr}", attributes)); + assertEquals("$$My Value", evaluateQueryForEscape("$$$$${attr}", attributes)); + } + + @Test + public void testWithBackSlashes() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("x", "C:\\test\\1.txt"); + attributes.put("y", "y\ny"); + + final String query = "${x:substringAfterLast( '/' ):substringAfterLast( '\\\\' )}"; + verifyEquals(query, attributes, "1.txt"); + attributes.put("x", "C:/test/1.txt"); + verifyEquals(query, attributes, "1.txt"); + + verifyEquals("${y:equals('y\\ny')}", attributes, Boolean.TRUE); + } + + @Test + public void testWithTicksOutside() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "My Value"); + + assertEquals(1, Query.extractExpressionRanges("\"${attr}").size()); + assertEquals(1, Query.extractExpressionRanges("'${attr}").size()); + assertEquals(1, Query.extractExpressionRanges("'${attr}'").size()); + assertEquals(1, Query.extractExpressionRanges("${attr}").size()); + + assertEquals("'My Value'", Query.evaluateExpressions("'${attr}'", attributes, null)); + assertEquals("'My Value", Query.evaluateExpressions("'${attr}", attributes, null)); + } + + @Test + @Ignore("Depends on TimeZone") + public void testDateToNumber() { + final Query query = Query.compile("${dateTime:toDate('yyyy/MM/dd HH:mm:ss.SSS'):toNumber()}"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("dateTime", "2013/11/18 10:22:27.678"); + + final QueryResult<?> result = query.evaluate(attributes); + assertEquals(ResultType.NUMBER, result.getResultType()); + assertEquals(1384788147678L, result.getValue()); + } + + @Test + public void testAddOneDayToDate() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("dateTime", "2013/11/18 10:22:27.678"); + + verifyEquals("${dateTime:toDate('yyyy/MM/dd HH:mm:ss.SSS'):toNumber():plus(86400000):toDate():format('yyyy/MM/dd HH:mm:ss.SSS')}", attributes, "2013/11/19 10:22:27.678"); + verifyEquals("${dateTime:toDate('yyyy/MM/dd HH:mm:ss.SSS'):plus(86400000):format('yyyy/MM/dd HH:mm:ss.SSS')}", attributes, "2013/11/19 10:22:27.678"); + } + + @Test + @Ignore("Requires specific locale") + public void implicitDateConversion() { + final Date date = new Date(); + final Query query = Query.compile("${dateTime:format('yyyy/MM/dd HH:mm:ss.SSS')}"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("dateTime", date.toString()); + + // the date.toString() above will end up truncating the milliseconds. So remove millis from the Date before + // formatting it + final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS", Locale.US); + final long millis = date.getTime() % 1000L; + final Date roundedToNearestSecond = new Date(date.getTime() - millis); + final String formatted = sdf.format(roundedToNearestSecond); + + final QueryResult<?> result = query.evaluate(attributes); + assertEquals(ResultType.STRING, result.getResultType()); + assertEquals(formatted, result.getValue()); + } + + @Test + public void testEmbeddedExpressionsAndQuotes() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("x", "abc"); + attributes.put("a", "abc"); + + verifyEquals("${x:equals(${a})}", attributes, true); + + Query.validateExpression("${x:equals('${a}')}", false); + assertEquals("true", Query.evaluateExpressions("${x:equals('${a}')}", attributes, null)); + + Query.validateExpression("${x:equals(\"${a}\")}", false); + assertEquals("true", Query.evaluateExpressions("${x:equals(\"${a}\")}", attributes, null)); + } + + @Test + public void testJoin() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("a.a", "a"); + attributes.put("a.b", "b"); + attributes.put("a.c", "c"); + verifyEquals("${allAttributes( 'a.a', 'a.b', 'a.c' ):join(', ')}", attributes, "a, b, c"); + verifyEquals("${x:join(', ')}", attributes, ""); + verifyEquals("${a.a:join(', ')}", attributes, "a"); + verifyEquals("${allAttributes( 'x', 'y' ):join(',')}", attributes, ","); + } + + @Test(expected = AttributeExpressionLanguageException.class) + public void testCannotCombineWithNonReducingFunction() { + Query.compileTree("${allAttributes( 'a.1' ):plus(1)}"); + } + + @Test + public void testIsEmpty() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("a", "a"); + attributes.put("b", ""); + attributes.put("c", " \n"); + + verifyEquals("${a:isEmpty()}", attributes, false); + verifyEquals("${b:isEmpty()}", attributes, true); + verifyEquals("${c:isEmpty()}", attributes, true); + verifyEquals("${d:isEmpty()}", attributes, true); + } + + @Test + public void testReplaceEmpty() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("a", "a"); + attributes.put("b", ""); + attributes.put("c", " \n"); + + verifyEquals("${a:replaceEmpty('c')}", attributes, "a"); + verifyEquals("${b:replaceEmpty('c')}", attributes, "c"); + verifyEquals("${c:replaceEmpty('c')}", attributes, "c"); + verifyEquals("${d:replaceEmpty('c')}", attributes, "c"); + } + + @Test + public void testCount() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("a", "a"); + attributes.put("b", "abc"); + attributes.put("c", " \n"); + attributes.put("n1", "111"); + attributes.put("n2", "222"); + attributes.put("n3", "333333"); + + verifyEquals("${allMatchingAttributes( '.*' ):count()}", attributes, 6L); + verifyEquals("${allMatchingAttributes( '.*' ):length():gt(2):count()}", attributes, 5L); + verifyEquals("${allMatchingAttributes( 'n.*' ):plus(1):count()}", attributes, 3L); + } + + @Test + public void testCurlyBracesInQuotes() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "My Valuee"); + + assertEquals("Val", evaluateQueryForEscape("${attr:replaceAll('My (Val)ue{1,2}', '$1')}", attributes)); + assertEquals("Val", evaluateQueryForEscape("${attr:replaceAll(\"My (Val)ue{1,2}\", '$1')}", attributes)); + } + + private String evaluateQueryForEscape(final String queryString, final Map<String, String> attributes) { + final FlowFile mockFlowFile = Mockito.mock(FlowFile.class); + Mockito.when(mockFlowFile.getAttributes()).thenReturn(attributes); + Mockito.when(mockFlowFile.getId()).thenReturn(1L); + Mockito.when(mockFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis()); + Mockito.when(mockFlowFile.getSize()).thenReturn(1L); + Mockito.when(mockFlowFile.getLineageIdentifiers()).thenReturn(new HashSet<String>()); + Mockito.when(mockFlowFile.getLineageStartDate()).thenReturn(System.currentTimeMillis()); + return Query.evaluateExpressions(queryString, mockFlowFile); + } + + @Test + public void testGetAttributeValue() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "My Value"); + verifyEquals("${attr}", attributes, "My Value"); + } + + @Test + public void testGetAttributeValueEmbedded() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "XX "); + attributes.put("XX", "My Value"); + verifyEquals("${${attr:trim()}}", attributes, "My Value"); + } + + @Test + public void testSimpleSubstring() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "My Value"); + verifyEquals("${attr:substring(2, 5)}", attributes, " Va"); + } + + @Test + public void testCallToFunctionWithSubjectResultOfAnotherFunctionCall() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", " My Value "); + verifyEquals("${attr:trim():substring(2, 5)}", attributes, " Va"); + } + + @Test + public void testProblematic1() { + // There was a bug that prevented this expression from compiling. This test just verifies that it now compiles. + final String queryString = "${xx:append( \"120101\" ):toDate( 'yyMMddHHmmss' ):format( \"yy-MM-ddâTâHH:mm:ss\") }"; + Query.compile(queryString); + } + + @Test + public void testEquals() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", " XX "); + verifyEquals("${attr:trim():equals('XX')}", attributes, true); + } + + @Test + public void testDeeplyEmbedded() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("x", "false"); + attributes.put("abc", "a"); + attributes.put("a", "a"); + + verifyEquals("${x:or( ${${abc}:length():equals(1)} )}", attributes, true); + } + + @Test + public void testExtractExpressionRanges() { + assertEquals(29, Query.extractExpressionRanges("${hello:equals( $${goodbye} )}").get(0).getEnd()); + + List<Range> ranges = Query.extractExpressionRanges("hello"); + assertTrue(ranges.isEmpty()); + + ranges = Query.extractExpressionRanges("${hello"); + assertTrue(ranges.isEmpty()); + + ranges = Query.extractExpressionRanges("hello}"); + assertTrue(ranges.isEmpty()); + + ranges = Query.extractExpressionRanges("$${hello"); + assertTrue(ranges.isEmpty()); + + ranges = Query.extractExpressionRanges("$he{ll}o"); + assertTrue(ranges.isEmpty()); + + ranges = Query.extractExpressionRanges("${hello}"); + assertEquals(1, ranges.size()); + Range range = ranges.get(0); + assertEquals(0, range.getStart()); + assertEquals(7, range.getEnd()); + + ranges = Query.extractExpressionRanges("${hello:equals( ${goodbye} )}"); + assertEquals(1, ranges.size()); + range = ranges.get(0); + assertEquals(0, range.getStart()); + assertEquals(28, range.getEnd()); + + ranges = Query.extractExpressionRanges("${hello:equals( $${goodbye} )}"); + assertEquals(1, ranges.size()); + range = ranges.get(0); + assertEquals(0, range.getStart()); + assertEquals(29, range.getEnd()); + + ranges = Query.extractExpressionRanges("${hello:equals( $${goodbye} )} or just hi, ${bob:or(${jerry})}"); + assertEquals(2, ranges.size()); + range = ranges.get(0); + assertEquals(0, range.getStart()); + assertEquals(29, range.getEnd()); + + range = ranges.get(1); + assertEquals(43, range.getStart()); + assertEquals(61, range.getEnd()); + + ranges = Query.extractExpressionRanges("${hello:equals( ${goodbye} )} or just hi, ${bob}, are you ${bob.age:toNumber()} yet? $$$${bob}"); + assertEquals(3, ranges.size()); + range = ranges.get(0); + assertEquals(0, range.getStart()); + assertEquals(28, range.getEnd()); + + range = ranges.get(1); + assertEquals(42, range.getStart()); + assertEquals(47, range.getEnd()); + + range = ranges.get(2); + assertEquals(58, range.getStart()); + assertEquals(78, range.getEnd()); + + ranges = Query.extractExpressionRanges("${x:matches( '.{4}' )}"); + assertEquals(1, ranges.size()); + range = ranges.get(0); + assertEquals(0, range.getStart()); + assertEquals(21, range.getEnd()); + } + + @Test + public void testExtractExpressionTypes() { + final List<ResultType> types = Query.extractResultTypes("${hello:equals( ${goodbye} )} or just hi, ${bob}, are you ${bob.age:toNumber()} yet? $$$${bob}"); + assertEquals(3, types.size()); + assertEquals(ResultType.BOOLEAN, types.get(0)); + assertEquals(ResultType.STRING, types.get(1)); + assertEquals(ResultType.NUMBER, types.get(2)); + } + + @Test + public void testEqualsEmbedded() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("x", "hello"); + attributes.put("y", "good-bye"); + + verifyEquals("${x:equals( ${y} )}", attributes, false); + + attributes.put("y", "hello"); + verifyEquals("${x:equals( ${y} )}", attributes, true); + + attributes.put("x", "4"); + attributes.put("y", "3"); + attributes.put("z", "1"); + attributes.put("h", "100"); + verifyEquals("${x:toNumber():lt( ${y:toNumber():plus( ${h:toNumber()} )} )}", attributes, true); + verifyEquals("${h:toNumber():ge( ${y:toNumber():plus( ${z:toNumber()} )} )}", attributes, true); + verifyEquals("${x:toNumber():equals( ${y:toNumber():plus( ${z:toNumber()} )} )}", attributes, true); + + attributes.put("x", "88"); + verifyEquals("${x:toNumber():gt( ${y:toNumber():plus( ${z:toNumber()} )} )}", attributes, true); + + attributes.put("y", "88"); + assertEquals("true", Query.evaluateExpressions("${x:equals( '${y}' )}", attributes, null)); + } + + @Test + public void testComplicatedEmbeddedExpressions() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("fox", "quick, brown"); + attributes.put("dog", "lazy"); + + verifyEquals("${fox:substring( ${ 'dog' :substring(2):length()}, 5 ):equals( 'ick' )}", attributes, true); + verifyEquals("${fox:substring( ${ 'dog' :substring(2):length()}, 5 ):equals( 'ick' )}", attributes, true); + } + + @Test + public void testQuotingQuotes() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("xx", "say 'hi'"); + + String query = "${xx:replaceAll( \"'.*'\", '\\\"hello\\\"' )}"; + verifyEquals(query, attributes, "say \"hello\""); + + query = "${xx:replace( \"'\", '\"')}"; + verifyEquals(query, attributes, "say \"hi\""); + + query = "${xx:replace( '\\'', '\"')}"; + System.out.println(query); + verifyEquals(query, attributes, "say \"hi\""); + } + + @Test + public void testDoubleQuotesWithinSingleQuotes() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("xx", "say 'hi'"); + + final String query = "${xx:replace( \"'hi'\", '\\\"hello\\\"' )}"; + System.out.println(query); + verifyEquals(query, attributes, "say \"hello\""); + } + + @Test + public void testEscapeQuotes() { + final long timestamp = 1403620278642L; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("date", String.valueOf(timestamp)); + + final String format = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + + final String query = "startDateTime=\"${date:toNumber():toDate():format(\"" + format + "\")}\""; + final String result = Query.evaluateExpressions(query, attributes, null); + + final String expectedTime = new SimpleDateFormat(format, Locale.US).format(timestamp); + assertEquals("startDateTime=\"" + expectedTime + "\"", result); + + final List<Range> ranges = Query.extractExpressionRanges(query); + assertEquals(1, ranges.size()); + } + + @Test + public void testDateConversion() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("date", "1403620278642"); + + verifyEquals("${date:format('yyyy')}", attributes, "2014"); + verifyEquals("${date:toDate():format('yyyy')}", attributes, "2014"); + verifyEquals("${date:toNumber():format('yyyy')}", attributes, "2014"); + verifyEquals("${date:toNumber():toDate():format('yyyy')}", attributes, "2014"); + verifyEquals("${date:toDate():toNumber():format('yyyy')}", attributes, "2014"); + verifyEquals("${date:toDate():toNumber():toDate():toNumber():toDate():toNumber():format('yyyy')}", attributes, "2014"); + } + + @Test + public void testSingleLetterAttribute() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("A", "0123456789"); + + verifyEquals("${A}", attributes, "0123456789"); + verifyEquals("${'A'}", attributes, "0123456789"); + } + + @Test + public void testImplicitConversions() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("A", "0123456789"); + attributes.put("b", "true"); + attributes.put("c", "false"); + attributes.put("d", "Quick Brown Fox"); + attributes.put("F", "-48"); + attributes.put("n", "2014/04/04 00:00:00"); + + final Calendar cal = Calendar.getInstance(); + cal.set(Calendar.YEAR, 2014); + cal.set(Calendar.MONTH, 3); + cal.set(Calendar.DAY_OF_MONTH, 4); + cal.set(Calendar.HOUR, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 45); + + final String dateString = cal.getTime().toString(); + attributes.put("z", dateString); + + verifyEquals("${A:plus(4)}", attributes, 123456793L); + verifyEquals("${A:plus( ${F} )}", attributes, 123456741L); + + verifyEquals("${F:lt( ${A} )}", attributes, true); + verifyEquals("${A:substring(2,3):plus(21):substring(1,2):plus(0)}", attributes, 3L); + verifyEquals("${n:format( 'yyyy' )}", attributes, "2014"); + verifyEquals("${z:format( 'yyyy' )}", attributes, "2014"); + + attributes.put("n", "2014/04/04 00:00:00.045"); + verifyEquals("${n:format( 'yyyy' ):append(','):append( ${n:format( 'SSS' )} )}", attributes, "2014,045"); + } + + @Test + public void testNewLinesAndTabsInQuery() { + final String query = "${ abc:equals('abc'):or( \n\t${xx:isNull()}\n) }"; + assertEquals(ResultType.BOOLEAN, Query.getResultType(query)); + Query.validateExpression(query, false); + assertEquals("true", Query.evaluateExpressions(query)); + } + + @Test + public void testAttributeReferencesWithWhiteSpace() { + final Map<String, String> attrs = new HashMap<>(); + attrs.put("a b c,d", "abc"); + + final String query = "${ 'a b c,d':equals('abc') }"; + verifyEquals(query, attrs, true); + } + + @Test + public void testComments() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + + final String expression + = "# hello, world\n" + + "${# ref attr\n" + + "\t" + + "abc" + + "\t" + + "#end ref attr\n" + + "}"; + + Query query = Query.compile(expression); + QueryResult<?> result = query.evaluate(attributes); + assertEquals(ResultType.STRING, result.getResultType()); + assertEquals("xyz", result.getValue()); + + query = Query.compile("${abc:append('# hello') #good-bye \n}"); + result = query.evaluate(attributes); + assertEquals(ResultType.STRING, result.getResultType()); + assertEquals("xyz# hello", result.getValue()); + } + + @Test + public void testAppendPrepend() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "XX"); + attributes.put("YXXX", "bingo"); + + verifyEquals("${${attr:append('X'):prepend('Y')}}", attributes, "bingo"); + } + + @Test + public void testIsNull() { + final Map<String, String> attributes = new HashMap<>(); + verifyEquals("${attr:isNull()}", attributes, true); + } + + @Test + public void testNotNull() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", ""); + + verifyEquals("${attr:notNull()}", attributes, true); + } + + @Test + public void testIsNullOrLengthEquals0() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", ""); + attributes.put("xyz", "xyz"); + attributes.put("xx", " "); + + verifyEquals("${abc:isNull():or( ${abc:length():equals(0)} )}", attributes, true); + verifyEquals("${xyz:isNull():or( ${xyz:length():equals(0)} )}", attributes, false); + verifyEquals("${none:isNull():or( ${none:length():equals(0)} )}", attributes, true); + verifyEquals("${xx:isNull():or( ${xx:trim():length():equals(0)} )}", attributes, true); + } + + @Test + public void testReplaceNull() { + final Map<String, String> attributes = new HashMap<>(); + verifyEquals("${attr:replaceNull('hello')}", attributes, "hello"); + } + + @Test + public void testReplace() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "hello"); + verifyEquals("${attr:replace('hell', 'yell')}", attributes, "yello"); + } + + @Test + public void testReplaceAll() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "hello"); + attributes.put("xyz", "00-00TEST.2014_01_01_000000_value"); + + verifyEquals("${xyz:replaceAll(\"^([^.]+)\\.([0-9]{4})_([0-9]{2})_([0-9]{2}).*$\", \"$3\")}", attributes, "01"); + verifyEquals("${attr:replaceAll('l+', 'r')}", attributes, "hero"); + + attributes.clear(); + attributes.put("filename1", "abc.gz"); + attributes.put("filename2", "abc.g"); + attributes.put("filename3", "abc.gz.gz"); + attributes.put("filename4", "abc.gz.g"); + attributes.put("abc", "hello world"); + + verifyEquals("${filename3:replaceAll('\\\\\\.gz$', '')}", attributes, "abc.gz.gz"); + verifyEquals("${filename3:replaceAll('\\\\\\\\.gz$', '')}", attributes, "abc.gz.gz"); + verifyEquals("${filename1:replaceAll('\\.gz$', '')}", attributes, "abc"); + verifyEquals("${filename2:replaceAll('\\.gz$', '')}", attributes, "abc.g"); + verifyEquals("${filename4:replaceAll('\\\\.gz$', '')}", attributes, "abc.gz.g"); + + verifyEquals("${abc:replaceAll( 'lo wor(ld)', '$0')}", attributes, "hello world"); + verifyEquals("${abc:replaceAll( 'he(llo) world', '$1')}", attributes, "llo"); + verifyEquals("${abc:replaceAll( 'xx', '$0')}", attributes, "hello world"); + verifyEquals("${abc:replaceAll( '(xx)', '$1')}", attributes, "hello world"); + verifyEquals("${abc:replaceAll( 'lo wor(ld)', '$1')}", attributes, "helld"); + + } + + @Test + public void testReplaceAllWithOddNumberOfBackslashPairs() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename", "C:\\temp\\.txt"); + + verifyEquals("${filename:replace('\\\\', '/')}", attributes, "C:/temp/.txt"); + verifyEquals("${filename:replaceAll('\\\\\\\\', '/')}", attributes, "C:/temp/.txt"); + verifyEquals("${filename:replaceAll('\\\\\\.txt$', '')}", attributes, "C:\\temp"); + } + + @Test + public void testReplaceAllWithMatchingGroup() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "hello"); + + verifyEquals("${attr:replaceAll('.*?(l+).*', '$1')}", attributes, "ll"); + } + + @Test + public void testMathOperations() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("one", "1"); + attributes.put("two", "2"); + attributes.put("three", "3"); + attributes.put("four", "4"); + attributes.put("five", "5"); + attributes.put("hundred", "100"); + + verifyEquals("${hundred:toNumber():multiply(2):divide(3):plus(1):mod(5)}", attributes, 2L); + } + + @Test + public void testIndexOf() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("attr", "https://abc.go"); + verifyEquals("${attr:indexOf('/')}", attributes, 6L); + } + + @Test + public void testDate() { + final Calendar now = Calendar.getInstance(); + final int year = now.get(Calendar.YEAR); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("entryDate", String.valueOf(now.getTimeInMillis())); + + verifyEquals("${entryDate:toNumber():toDate():format('yyyy')}", attributes, String.valueOf(year)); + + attributes.clear(); + attributes.put("month", "3"); + attributes.put("day", "4"); + attributes.put("year", "2013"); + assertEquals("63", Query.evaluateExpressions("${year:append('/'):append(${month}):append('/'):append(${day}):toDate('yyyy/MM/dd'):format('D')}", attributes, null)); + assertEquals("63", Query.evaluateExpressions("${year:append('/'):append('${month}'):append('/'):append('${day}'):toDate('yyyy/MM/dd'):format('D')}", attributes, null)); + + verifyEquals("${year:append('/'):append(${month}):append('/'):append(${day}):toDate('yyyy/MM/dd'):format('D')}", attributes, "63"); + } + + @Test + public void testSystemProperty() { + System.setProperty("hello", "good-bye"); + assertEquals("good-bye", Query.evaluateExpressions("${hello}")); + assertEquals("good-bye", Query.compile("${hello}").evaluate().getValue()); + } + + @Test + public void testAnyAttribute() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "zzz"); + attributes.put("xyz", "abc"); + + verifyEquals("${anyAttribute('abc', 'xyz', 'missingAttr'):substring(1,2):equals('b')}", attributes, true); + verifyEquals("${anyAttribute('abc', 'xyz'):substring(1,2):equals('b')}", attributes, true); + verifyEquals("${anyAttribute('xyz', 'abc'):substring(1,2):equals('b')}", attributes, true); + verifyEquals("${anyAttribute('zz'):substring(1,2):equals('b')}", attributes, false); + verifyEquals("${anyAttribute('abc', 'zz'):isNull()}", attributes, true); + } + + @Test + public void testAnyMatchingAttribute() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "zzz"); + attributes.put("xyz", "abc"); + attributes.put("123.cba", "hello"); + + verifyEquals("${anyMatchingAttribute('.{2}x', '.{2}z'):substring(1,2):equals('b')}", attributes, true); + verifyEquals("${anyMatchingAttribute('.*'):substring(1,2):equals('b')}", attributes, true); + verifyEquals("${anyMatchingAttribute('x{44}'):substring(1,2):equals('b')}", attributes, false); + verifyEquals("${anyMatchingAttribute('abc'):substring(1,2):equals('b')}", attributes, false); + verifyEquals("${anyMatchingAttribute('xyz'):substring(1,2):equals('b')}", attributes, true); + verifyEquals("${anyMatchingAttribute('xyz'):notNull()}", attributes, true); + verifyEquals("${anyMatchingAttribute('xyz'):isNull()}", attributes, false); + verifyEquals("${anyMatchingAttribute('xxxxxxxxx'):notNull()}", attributes, false); + verifyEquals("${anyMatchingAttribute('123\\.c.*'):matches('hello')}", attributes, true); + verifyEquals("${anyMatchingAttribute('123\\.c.*|a.c'):matches('zzz')}", attributes, true); + } + + @Test + public void testAnyDelineatedValue() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "a,b,c"); + attributes.put("xyz", "abc"); + + final String query = "${anyDelineatedValue('${abc}', ','):equals('b')}"; + assertEquals(ResultType.BOOLEAN, Query.getResultType(query)); + + assertEquals("true", Query.evaluateExpressions(query, attributes, null)); + assertEquals("true", Query.evaluateExpressions("${anyDelineatedValue('${abc}', ','):equals('a')}", attributes, null)); + assertEquals("true", Query.evaluateExpressions("${anyDelineatedValue('${abc}', ','):equals('c')}", attributes, null)); + assertEquals("false", Query.evaluateExpressions("${anyDelineatedValue('${abc}', ','):equals('d')}", attributes, null)); + + verifyEquals("${anyDelineatedValue(${abc}, ','):equals('b')}", attributes, true); + verifyEquals("${anyDelineatedValue(${abc}, ','):equals('a')}", attributes, true); + verifyEquals("${anyDelineatedValue(${abc}, ','):equals('c')}", attributes, true); + verifyEquals("${anyDelineatedValue(${abc}, ','):equals('d')}", attributes, false); + } + + @Test + public void testAllDelineatedValues() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "a,b,c"); + attributes.put("xyz", "abc"); + + final String query = "${allDelineatedValues('${abc}', ','):matches('[abc]')}"; + + assertEquals(ResultType.BOOLEAN, Query.getResultType(query)); + assertEquals("true", Query.evaluateExpressions(query, attributes, null)); + assertEquals("true", Query.evaluateExpressions(query, attributes, null)); + assertEquals("false", Query.evaluateExpressions("${allDelineatedValues('${abc}', ','):matches('[abd]')}", attributes, null)); + assertEquals("false", Query.evaluateExpressions("${allDelineatedValues('${abc}', ','):equals('a'):not()}", attributes, null)); + + verifyEquals("${allDelineatedValues(${abc}, ','):matches('[abc]')}", attributes, true); + verifyEquals("${allDelineatedValues(${abc}, ','):matches('[abd]')}", attributes, false); + verifyEquals("${allDelineatedValues(${abc}, ','):equals('a'):not()}", attributes, false); + } + + @Test + public void testAllAttributes() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "1234"); + attributes.put("xyz", "4132"); + attributes.put("hello", "world!"); + + verifyEquals("${allAttributes('abc', 'xyz'):matches('\\d+')}", attributes, true); + verifyEquals("${allAttributes('abc', 'xyz'):toNumber():lt(99999)}", attributes, true); + verifyEquals("${allAttributes('abc', 'hello'):length():gt(3)}", attributes, true); + verifyEquals("${allAttributes('abc', 'hello'):length():equals(4)}", attributes, false); + verifyEquals("${allAttributes('abc', 'xyz'):length():equals(4)}", attributes, true); + verifyEquals("${allAttributes('abc', 'xyz', 'other'):isNull()}", attributes, false); + + try { + Query.compile("${allAttributes('#ah'):equals('hello')"); + Assert.fail("Was able to compile with allAttributes and an invalid attribute name"); + } catch (final AttributeExpressionLanguageParsingException e) { + // expected behavior + } + } + + @Test + public void testMathOperators() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "1234"); + attributes.put("xyz", "4132"); + attributes.put("hello", "world!"); + + verifyEquals("${xyz:toNumber():gt( ${abc:toNumber()} )}", attributes, true); + } + + @Test + public void testAllMatchingAttributes() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "1234"); + attributes.put("xyz", "4132"); + attributes.put("hello", "world!"); + attributes.put("123.cba", "hell.o"); + + System.out.println(printTree(Query.compile("${allMatchingAttributes('(abc|xyz)'):matches('\\\\d+')}").getTree())); + + verifyEquals("${'123.cba':matches('hell\\.o')}", attributes, true); + verifyEquals("${allMatchingAttributes('123\\.cba'):equals('hell.o')}", attributes, true); + verifyEquals("${allMatchingAttributes('(abc|xyz)'):matches('\\d+')}", attributes, true); + verifyEquals("${allMatchingAttributes('[ax].*'):toNumber():lt(99999)}", attributes, true); + verifyEquals("${allMatchingAttributes('hell.'):length():gt(3)}", attributes, true); + + verifyEquals("${allMatchingAttributes('123\\.cba'):equals('no')}", attributes, false); + } + + @Test + public void testMatches() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "1234xyz4321"); + attributes.put("end", "xyz"); + attributes.put("xyz", "4132"); + attributes.put("hello", "world!"); + attributes.put("dotted", "abc.xyz"); + + final String evaluated = Query.evaluateExpressions("${abc:matches('1234${end}4321')}", attributes, null); + assertEquals("true", evaluated); + + attributes.put("end", "888"); + final String secondEvaluation = Query.evaluateExpressions("${abc:matches('1234${end}4321')}", attributes, null); + assertEquals("false", secondEvaluation); + + verifyEquals("${dotted:matches('abc\\.xyz')}", attributes, true); + } + + @Test + public void testFind() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "1234xyz4321"); + attributes.put("end", "xyz"); + attributes.put("xyz", "4132"); + attributes.put("hello", "world!"); + attributes.put("dotted", "abc.xyz"); + + final String evaluated = Query.evaluateExpressions("${abc:find('1234${end}4321')}", attributes, null); + assertEquals("true", evaluated); + + attributes.put("end", "888"); + final String secondEvaluation = Query.evaluateExpressions("${abc:find('${end}4321')}", attributes, null); + assertEquals("false", secondEvaluation); + + verifyEquals("${dotted:find('\\.')}", attributes, true); + } + + @Test + public void testSubstringAfter() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename", "file-255"); + + verifyEquals("${filename:substringAfter('')}", attributes, "file-255"); + verifyEquals("${filename:substringAfterLast('')}", attributes, "file-255"); + verifyEquals("${filename:substringBefore('')}", attributes, "file-255"); + verifyEquals("${filename:substringBeforeLast('')}", attributes, "file-255"); + verifyEquals("${filename:substringBefore('file')}", attributes, ""); + + attributes.put("uri", "sftp://some.uri"); + verifyEquals("${uri:substringAfter('sftp')}", attributes, "://some.uri"); + } + + @Test + public void testSubstringAfterLast() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename", "file-file-255"); + + verifyEquals("${filename:substringAfterLast('file-')}", attributes, "255"); + verifyEquals("${filename:substringAfterLast('5')}", attributes, ""); + verifyEquals("${filename:substringAfterLast('x')}", attributes, "file-file-255"); + } + + @Test + public void testSubstringBefore() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("something", "some {} or other"); + + verifyEquals("${something:substringBefore('}')}", attributes, "some {"); + } + + @Test + public void testSubstring() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename", "file-255"); + + verifyEquals("${filename:substring(1, 2)}", attributes, "i"); + verifyEquals("${filename:substring(4)}", attributes, "-255"); + } + + @Test + public void testToRadix() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename", "file-255"); + attributes.put("filename2", "file-99999"); + + verifyEquals("${filename:substringAfter('-'):toNumber():toRadix(16):toUpper()}", attributes, "FF"); + verifyEquals("${filename:substringAfter('-'):toNumber():toRadix(16, 4):toUpper()}", attributes, "00FF"); + verifyEquals("${filename:substringAfter('-'):toNumber():toRadix(36, 3):toUpper()}", attributes, "073"); + } + + @Test + public void testDateFormatConversion() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("blue", "20130917162643"); + verifyEquals("${blue:toDate('yyyyMMddHHmmss'):format(\"yyyy/MM/dd HH:mm:ss.SSS'Z'\")}", attributes, "2013/09/17 16:26:43.000Z"); + } + + @Test + public void testNot() { + verifyEquals("${ab:notNull():not()}", new HashMap<String, String>(), true); + } + + @Test + public void testAttributesWithSpaces() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("ab", "abc"); + attributes.put("a b", "abc"); + + verifyEquals("${ab}", attributes, "abc"); + verifyEquals("${'a b'}", attributes, "abc"); + verifyEquals("${'a b':replaceNull('')}", attributes, ""); + } + + @Test + public void testOr() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename1", "xabc"); + attributes.put("filename2", "yabc"); + attributes.put("filename3", "abcxy"); + + verifyEquals("${filename1:startsWith('x'):or(true)}", attributes, true); + verifyEquals("${filename1:startsWith('x'):or( ${filename1:startsWith('y')} )}", attributes, true); + verifyEquals("${filename2:startsWith('x'):or( ${filename2:startsWith('y')} )}", attributes, true); + verifyEquals("${filename3:startsWith('x'):or( ${filename3:startsWith('y')} )}", attributes, false); + verifyEquals("${filename1:startsWith('x'):or( ${filename2:startsWith('y')} )}", attributes, true); + verifyEquals("${filename2:startsWith('x'):or( ${filename1:startsWith('y')} )}", attributes, false); + } + + @Test + public void testAnd() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename1", "xabc"); + attributes.put("filename2", "yabc"); + attributes.put("filename 3", "abcxy"); + + verifyEquals("${filename1:startsWith('x'):and(true)}", attributes, true); + verifyEquals("${filename1:startsWith('x') : and( false )}", attributes, false); + verifyEquals("${filename1:startsWith('x'):and( ${filename1:startsWith('y')} )}", attributes, false); + verifyEquals("${filename2:startsWith('x'):and( ${filename2:startsWith('y')} )}", attributes, false); + verifyEquals("${filename3:startsWith('x'):and( ${filename3:startsWith('y')} )}", attributes, false); + verifyEquals("${filename1:startsWith('x'):and( ${filename2:startsWith('y')} )}", attributes, true); + verifyEquals("${filename2:startsWith('x'):and( ${filename1:startsWith('y')} )}", attributes, false); + verifyEquals("${filename1:startsWith('x'):and( ${'filename 3':endsWith('y')} )}", attributes, true); + } + + @Test + public void testAndOrNot() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename1", "xabc"); + attributes.put("filename2", "yabc"); + attributes.put("filename 3", "abcxy"); + + final String query + = "${" + + " 'non-existing':notNull():not():and(" + // true AND ( + " ${filename1:startsWith('y')" + // false + " :or(" + // or + " ${ filename1:startsWith('x'):and(false) }" + // false + " ):or(" + // or + " ${ filename2:endsWith('xxxx'):or( ${'filename 3':length():gt(1)} ) }" + // true ) + " )}" + + " )" + + "}"; + + System.out.println(query); + verifyEquals(query, attributes, true); + } + + @Test + public void testAndOrLogicWithAnyAll() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename1", "xabc"); + attributes.put("filename2", "yabc"); + attributes.put("filename 3", "abcxy"); + + verifyEquals("${anyMatchingAttribute('filename.*'):contains('abc'):and( ${filename2:equals('yabc')} )}", attributes, true); + verifyEquals("${anyMatchingAttribute('filename.*'):contains('abc'):and( ${filename2:equals('xabc')} )}", attributes, false); + verifyEquals("${anyMatchingAttribute('filename.*'):contains('abc'):not():or( ${filename2:equals('yabc')} )}", attributes, true); + verifyEquals("${anyMatchingAttribute('filename.*'):contains('abc'):not():or( ${filename2:equals('xabc')} )}", attributes, false); + } + + @Test + public void testKeywords() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("UUID", "123"); + verifyEquals("${ 'UUID':toNumber():equals(123) }", attributes, true); + } + + @Test + public void testEqualsNumber() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "123"); + verifyEquals("${ abc:toNumber():equals(123) }", attributes, true); + } + + @Test + public void testSubjectAsEmbeddedExpressionWithSurroundChars() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("b", "x"); + attributes.put("abcxcba", "hello"); + + final String evaluated = Query.evaluateExpressions("${ 'abc${b}cba':substring(0, 1) }", attributes, null); + assertEquals("h", evaluated); + } + + @Test + public void testToNumberFunctionReturnsNumberType() { + assertEquals(ResultType.NUMBER, Query.getResultType("${header.size:toNumber()}")); + } + + @Test + public void testAnyAttributeEmbedded() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("a1", "test1"); + attributes.put("b2", "2test"); + attributes.put("c3", "3test3"); + + final String query = "${a1:equals('test1'):and( ${anyAttribute('a1','b2','c3'):contains('2')})}"; + verifyEquals(query, attributes, true); + } + + @Test + public void testEvaluateWithinCurlyBraces() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + + final String query = "{ ${abc} }"; + final List<String> expressions = Query.extractExpressions(query); + assertEquals(1, expressions.size()); + assertEquals("${abc}", expressions.get(0)); + assertEquals("{ xyz }", Query.evaluateExpressions(query, attributes)); + } + + @Test + public void testLiteralFunction() { + final Map<String, String> attrs = Collections.<String, String> emptyMap(); + verifyEquals("${literal(2):gt(1)}", attrs, true); + verifyEquals("${literal('hello'):substring(0, 1):equals('h')}", attrs, true); + } + + @Test + public void testFunctionAfterReduce() { + // Cannot call gt(2) after count() because count() is a 'reducing function' + // and must be the last function in an expression. + assertFalse(Query.isValidExpression("${allMatchingAttributes('a.*'):contains('2'):count():gt(2)}")); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("a.1", "245"); + attributes.put("a.2", "123"); + attributes.put("a.3", "732"); + attributes.put("a.4", "343"); + attributes.put("a.5", "553"); + + final String endsWithCount = "${allMatchingAttributes('a.*'):contains('2'):count()}"; + assertTrue(Query.isValidExpression(endsWithCount)); + verifyEquals(endsWithCount, attributes, 3L); + + // in order to check if value is greater than 2, need to first evaluate the + // 'aggregate' and 'reducing' functions as an inner expression. Then we can + // use the literal() function to make the result of the inner expression the subject + // of the function gt() + final String usingLiteral = "${literal(" + endsWithCount + "):gt(2)}"; + assertTrue(Query.isValidExpression(usingLiteral)); + verifyEquals(usingLiteral, attributes, true); + + attributes.clear(); + attributes.put("a1", "123"); + attributes.put("a2", "321"); + verifyEquals("${allMatchingAttributes('a.*'):contains('2')}", attributes, true); + verifyEquals("${allMatchingAttributes('a.*'):contains('2'):toUpper():equals('TRUE')}", attributes, true); + verifyEquals("${allMatchingAttributes('a.*'):contains('2'):equals('true'):and( ${literal(true)} )}", attributes, true); + } + + private void verifyEquals(final String expression, final Map<String, String> attributes, final Object expectedResult) { + Query.validateExpression(expression, false); + assertEquals(String.valueOf(expectedResult), Query.evaluateExpressions(expression, attributes, null)); + + final Query query = Query.compile(expression); + final QueryResult<?> result = query.evaluate(attributes); + + if (expectedResult instanceof Number) { + assertEquals(ResultType.NUMBER, result.getResultType()); + } else if (expectedResult instanceof Boolean) { + assertEquals(ResultType.BOOLEAN, result.getResultType()); + } else { + assertEquals(ResultType.STRING, result.getResultType()); + } + + assertEquals(expectedResult, result.getValue()); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java new file mode 100644 index 0000000..5acba8d --- /dev/null +++ b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.attribute.expression.language; + +import static org.junit.Assert.assertEquals; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.Ignore; +import org.junit.Test; + +public class TestStandardPreparedQuery { + + @Test + public void testSimpleReference() { + final Map<String, String> attrs = new HashMap<>(); + attrs.put("xx", "world"); + + assertEquals("world", evaluate("${xx}", attrs)); + assertEquals("hello, world!", evaluate("hello, ${xx}!", attrs)); + } + + @Test + public void testEmbeddedReference() { + final Map<String, String> attrs = new HashMap<>(); + attrs.put("xx", "yy"); + attrs.put("yy", "world"); + + assertEquals("world", evaluate("${${xx}}", attrs)); + } + + @Test + public void test10MIterations() { + final Map<String, String> attrs = new HashMap<>(); + attrs.put("xx", "world"); + + final StandardPreparedQuery prepared = (StandardPreparedQuery) Query.prepare("${xx}"); + final long start = System.nanoTime(); + for (int i = 0; i < 10000000; i++) { + assertEquals("world", prepared.evaluateExpressions(attrs, null)); + } + final long nanos = System.nanoTime() - start; + System.out.println(TimeUnit.NANOSECONDS.toMillis(nanos)); + } + + @Test + @Ignore("Takes too long") + public void test10MIterationsWithQuery() { + final Map<String, String> attrs = new HashMap<>(); + attrs.put("xx", "world"); + + final long start = System.nanoTime(); + for (int i = 0; i < 10000000; i++) { + assertEquals("world", Query.evaluateExpressions("${xx}", attrs)); + } + final long nanos = System.nanoTime() - start; + System.out.println(TimeUnit.NANOSECONDS.toMillis(nanos)); + + } + + @Test + public void testSeveralSequentialExpressions() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("audience", "World"); + attributes.put("comma", ","); + attributes.put("question", " how are you?"); + assertEquals("Hello, World, how are you?!", evaluate("Hello, ${audience}${comma}${question}!", attributes)); + + } + + private String evaluate(final String query, final Map<String, String> attrs) { + final String evaluated = ((StandardPreparedQuery) Query.prepare(query)).evaluateExpressions(attrs, null); + return evaluated; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-flowfile-packager/pom.xml b/nifi-commons/nifi-flowfile-packager/pom.xml new file mode 100644 index 0000000..7173f03 --- /dev/null +++ b/nifi-commons/nifi-flowfile-packager/pom.xml @@ -0,0 +1,34 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-flowfile-packager</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java new file mode 100644 index 0000000..ae16f99 --- /dev/null +++ b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +public interface FlowFilePackager { + + void packageFlowFile(InputStream in, OutputStream out, Map<String, String> attributes, long fileSize) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java new file mode 100644 index 0000000..479ac58 --- /dev/null +++ b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.lang3.StringEscapeUtils; + +public class FlowFilePackagerV1 implements FlowFilePackager { + + public static final String FILENAME_ATTRIBUTES = "flowfile.attributes"; + public static final String FILENAME_CONTENT = "flowfile.content"; + public static final int DEFAULT_TAR_PERMISSIONS = 0644; + + private final int tarPermissions; + + public FlowFilePackagerV1() { + this(DEFAULT_TAR_PERMISSIONS); + } + + public FlowFilePackagerV1(final int tarPermissions) { + this.tarPermissions = tarPermissions; + } + + @Override + public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException { + try (final TarArchiveOutputStream tout = new TarArchiveOutputStream(out)) { + writeAttributesEntry(attributes, tout); + writeContentEntry(tout, in, fileSize); + tout.finish(); + tout.flush(); + tout.close(); + } + } + + private void writeAttributesEntry(final Map<String, String> attributes, final TarArchiveOutputStream tout) throws IOException { + final StringBuilder sb = new StringBuilder(); + sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?><!DOCTYPE properties\n SYSTEM \"http://java.sun.com/dtd/properties.dtd\">\n"); + sb.append("<properties>"); + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + final String escapedKey = StringEscapeUtils.escapeXml11(entry.getKey()); + final String escapedValue = StringEscapeUtils.escapeXml11(entry.getValue()); + sb.append("\n <entry key=\"").append(escapedKey).append("\">").append(escapedValue).append("</entry>"); + } + sb.append("</properties>"); + + final byte[] metaBytes = sb.toString().getBytes(StandardCharsets.UTF_8); + final TarArchiveEntry attribEntry = new TarArchiveEntry(FILENAME_ATTRIBUTES); + attribEntry.setMode(tarPermissions); + attribEntry.setSize(metaBytes.length); + tout.putArchiveEntry(attribEntry); + tout.write(metaBytes); + tout.closeArchiveEntry(); + } + + private void writeContentEntry(final TarArchiveOutputStream tarOut, final InputStream inStream, final long fileSize) throws IOException { + final TarArchiveEntry entry = new TarArchiveEntry(FILENAME_CONTENT); + entry.setMode(tarPermissions); + entry.setSize(fileSize); + tarOut.putArchiveEntry(entry); + final byte[] buffer = new byte[512 << 10];//512KB + int bytesRead = 0; + while ((bytesRead = inStream.read(buffer)) != -1) { //still more data to read + if (bytesRead > 0) { + tarOut.write(buffer, 0, bytesRead); + } + } + + copy(inStream, tarOut); + tarOut.closeArchiveEntry(); + } + + public static long copy(final InputStream source, final OutputStream destination) throws IOException { + final byte[] buffer = new byte[8192]; + int len; + long totalCount = 0L; + while ((len = source.read(buffer)) > 0) { + destination.write(buffer, 0, len); + totalCount += len; + } + return totalCount; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java new file mode 100644 index 0000000..6f9d6b1 --- /dev/null +++ b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +/** + * <p> + * Packages a FlowFile, including both its content and its attributes into a + * single file that is stream-friendly. The encoding scheme is as such: + * </p> + * + * <pre> + * Length Field : indicates the number of Flow File Attributes in the stream + * 1 to N times (N=number of Flow File Attributes): + * String Field : Flow File Attribute key name + * String Field : Flow File Attribute value + * Long : 8 bytes indicating the length of the Flow File content + * Content : The next M bytes are the content of the Flow File. + * </pre> + * + * <pre> + * Encoding of String Field is as follows: + * Length Field : indicates the length of the String + * 1 to N bytes (N=String length, determined by previous field, as described above) : The UTF-8 encoded string value. + * </pre> + * + * <pre> + * Encoding of Length Field is as follows: + * First 2 bytes: Indicate length. If both bytes = 255, this is a special value indicating that the length is + * greater than or equal to 65536 bytes; therefore, the next 4 bytes will indicate the actual length. + * </pre> + * + * <p> + * Note: All byte-order encoding is Network Byte Order (Most Significant Byte + * first) + * </p> + * + * <p> + * The following example shows the bytes expected if we were to encode a + * FlowFile containing the following attributes where the content is the text + * "Hello World!": + * + * <br><br> + * Attributes: + * <pre> + * +-------+-------+ + * | Key + Value | + * + --------------+ + * | A | a | + * + --------------+ + * | B | b | + * + --------------+ + * </pre> Content:<br> + * Hello World! + * <br><br> + * Packaged Byte Encoding (In Hexadecimal Form): + * <p> + * + * <pre> + * 00 02 00 01 41 00 01 61 + * 00 01 42 00 01 62 00 00 + * 00 00 00 00 00 0C 48 65 + * 6C 6C 6F 20 57 6F 72 6C + * 64 21 + * </pre> + */ +public class FlowFilePackagerV2 implements FlowFilePackager { + + private static final int MAX_VALUE_2_BYTES = 65535; + private final byte[] writeBuffer = new byte[8]; + + @Override + public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException { + writeFieldLength(out, attributes.size()); //write out the number of attributes + for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair + writeString(entry.getKey(), out); + writeString(entry.getValue(), out); + } + writeLong(out, fileSize);//write out length of data + copy(in, out);//write out the actual flow file payload + } + + private void copy(final InputStream in, final OutputStream out) throws IOException { + final byte[] buffer = new byte[65536]; + int len; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + } + + private void writeString(final String val, final OutputStream out) throws IOException { + final byte[] bytes = val.getBytes("UTF-8"); + writeFieldLength(out, bytes.length); + out.write(bytes); + } + + private void writeFieldLength(final OutputStream out, final int numBytes) throws IOException { + // If the value is less than the max value that can be fit into 2 bytes, just use the + // actual value. Otherwise, we will set the first 2 bytes to 255/255 and then use the next + // 4 bytes to indicate the real length. + if (numBytes < MAX_VALUE_2_BYTES) { + writeBuffer[0] = (byte) (numBytes >>> 8); + writeBuffer[1] = (byte) (numBytes); + out.write(writeBuffer, 0, 2); + } else { + writeBuffer[0] = (byte) 0xff; + writeBuffer[1] = (byte) 0xff; + writeBuffer[2] = (byte) (numBytes >>> 24); + writeBuffer[3] = (byte) (numBytes >>> 16); + writeBuffer[4] = (byte) (numBytes >>> 8); + writeBuffer[5] = (byte) (numBytes); + out.write(writeBuffer, 0, 6); + } + } + + private void writeLong(final OutputStream out, final long val) throws IOException { + writeBuffer[0] = (byte) (val >>> 56); + writeBuffer[1] = (byte) (val >>> 48); + writeBuffer[2] = (byte) (val >>> 40); + writeBuffer[3] = (byte) (val >>> 32); + writeBuffer[4] = (byte) (val >>> 24); + writeBuffer[5] = (byte) (val >>> 16); + writeBuffer[6] = (byte) (val >>> 8); + writeBuffer[7] = (byte) (val); + out.write(writeBuffer, 0, 8); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java new file mode 100644 index 0000000..181f3e3 --- /dev/null +++ b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +public class FlowFilePackagerV3 implements FlowFilePackager { + + public static final byte[] MAGIC_HEADER = {'N', 'i', 'F', 'i', 'F', 'F', '3'}; + private static final int MAX_VALUE_2_BYTES = 65535; + private final byte[] writeBuffer = new byte[8]; + + @Override + public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException { + out.write(MAGIC_HEADER); + + if (attributes == null) { + writeFieldLength(out, 0); + } else { + writeFieldLength(out, attributes.size()); //write out the number of attributes + for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair + writeString(entry.getKey(), out); + writeString(entry.getValue(), out); + } + } + + writeLong(out, fileSize);//write out length of data + copy(in, out);//write out the actual flow file payload + } + + private void copy(final InputStream in, final OutputStream out) throws IOException { + final byte[] buffer = new byte[65536]; + int len; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + } + + private void writeString(final String val, final OutputStream out) throws IOException { + final byte[] bytes = val.getBytes("UTF-8"); + writeFieldLength(out, bytes.length); + out.write(bytes); + } + + private void writeFieldLength(final OutputStream out, final int numBytes) throws IOException { + // If the value is less than the max value that can be fit into 2 bytes, just use the + // actual value. Otherwise, we will set the first 2 bytes to 255/255 and then use the next + // 4 bytes to indicate the real length. + if (numBytes < MAX_VALUE_2_BYTES) { + writeBuffer[0] = (byte) (numBytes >>> 8); + writeBuffer[1] = (byte) (numBytes); + out.write(writeBuffer, 0, 2); + } else { + writeBuffer[0] = (byte) 0xff; + writeBuffer[1] = (byte) 0xff; + writeBuffer[2] = (byte) (numBytes >>> 24); + writeBuffer[3] = (byte) (numBytes >>> 16); + writeBuffer[4] = (byte) (numBytes >>> 8); + writeBuffer[5] = (byte) (numBytes); + out.write(writeBuffer, 0, 6); + } + } + + private void writeLong(final OutputStream out, final long val) throws IOException { + writeBuffer[0] = (byte) (val >>> 56); + writeBuffer[1] = (byte) (val >>> 48); + writeBuffer[2] = (byte) (val >>> 40); + writeBuffer[3] = (byte) (val >>> 32); + writeBuffer[4] = (byte) (val >>> 24); + writeBuffer[5] = (byte) (val >>> 16); + writeBuffer[6] = (byte) (val >>> 8); + writeBuffer[7] = (byte) (val); + out.write(writeBuffer, 0, 8); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java new file mode 100644 index 0000000..fd9d92d --- /dev/null +++ b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +public interface FlowFileUnpackager { + + Map<String, String> unpackageFlowFile(InputStream in, OutputStream out) throws IOException; + + boolean hasMoreData() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java new file mode 100644 index 0000000..b96534a --- /dev/null +++ b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; + +public class FlowFileUnpackagerV1 implements FlowFileUnpackager { + + private int flowFilesRead = 0; + + @Override + public Map<String, String> unpackageFlowFile(final InputStream in, final OutputStream out) throws IOException { + flowFilesRead++; + final TarArchiveInputStream tarIn = new TarArchiveInputStream(in); + final TarArchiveEntry attribEntry = tarIn.getNextTarEntry(); + if (attribEntry == null) { + return null; + } + + final Map<String, String> attributes; + if (attribEntry.getName().equals(FlowFilePackagerV1.FILENAME_ATTRIBUTES)) { + attributes = getAttributes(tarIn); + } else { + throw new IOException("Expected two tar entries: " + + FlowFilePackagerV1.FILENAME_CONTENT + " and " + + FlowFilePackagerV1.FILENAME_ATTRIBUTES); + } + + final TarArchiveEntry contentEntry = tarIn.getNextTarEntry(); + + if (contentEntry != null && contentEntry.getName().equals(FlowFilePackagerV1.FILENAME_CONTENT)) { + final byte[] buffer = new byte[512 << 10];//512KB + int bytesRead = 0; + while ((bytesRead = tarIn.read(buffer)) != -1) { //still more data to read + if (bytesRead > 0) { + out.write(buffer, 0, bytesRead); + } + } + out.flush(); + } else { + throw new IOException("Expected two tar entries: " + + FlowFilePackagerV1.FILENAME_CONTENT + " and " + + FlowFilePackagerV1.FILENAME_ATTRIBUTES); + } + + return attributes; + } + + protected Map<String, String> getAttributes(final TarArchiveInputStream stream) throws IOException { + + final Properties props = new Properties(); + props.loadFromXML(new NonCloseableInputStream(stream)); + + final Map<String, String> result = new HashMap<>(); + for (final Entry<Object, Object> entry : props.entrySet()) { + final Object keyObject = entry.getKey(); + final Object valueObject = entry.getValue(); + if (!(keyObject instanceof String)) { + throw new IOException("Flow file attributes object contains key of type " + + keyObject.getClass().getCanonicalName() + + " but expected java.lang.String"); + } else if (!(keyObject instanceof String)) { + throw new IOException("Flow file attributes object contains value of type " + + keyObject.getClass().getCanonicalName() + + " but expected java.lang.String"); + } + + final String key = (String) keyObject; + final String value = (String) valueObject; + result.put(key, value); + } + + return result; + } + + @Override + public boolean hasMoreData() throws IOException { + return flowFilesRead == 0; + } + + public static final class NonCloseableInputStream extends InputStream { + + final InputStream stream; + + public NonCloseableInputStream(final InputStream stream) { + this.stream = stream; + } + + @Override + public void close() { + } + + @Override + public int read() throws IOException { + return stream.read(); + } + + @Override + public int available() throws IOException { + return stream.available(); + } + + @Override + public synchronized void mark(int readlimit) { + stream.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + stream.reset(); + } + + @Override + public boolean markSupported() { + return stream.markSupported(); + } + + @Override + public long skip(long n) throws IOException { + return stream.skip(n); + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + return stream.read(b, off, len); + } + + @Override + public int read(byte b[]) throws IOException { + return stream.read(b); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java new file mode 100644 index 0000000..500015f --- /dev/null +++ b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +public class FlowFileUnpackagerV2 implements FlowFileUnpackager { + + private final byte readBuffer[] = new byte[8192]; + private Map<String, String> nextAttributes = null; + private boolean haveReadSomething = false; + + @Override + public boolean hasMoreData() throws IOException { + return nextAttributes != null || !haveReadSomething; + } + + protected Map<String, String> readAttributes(final InputStream in) throws IOException { + final Map<String, String> attributes = new HashMap<>(); + final Integer numAttributes = readFieldLength(in); //read number of attributes + if (numAttributes == null) { + return null; + } + if (numAttributes == 0) { + throw new IOException("flow files cannot have zero attributes"); + } + for (int i = 0; i < numAttributes; i++) { //read each attribute key/value pair + final String key = readString(in); + final String value = readString(in); + attributes.put(key, value); + } + + return attributes; + } + + @Override + public Map<String, String> unpackageFlowFile(final InputStream in, final OutputStream out) throws IOException { + final Map<String, String> attributes; + if (nextAttributes != null) { + attributes = nextAttributes; + } else { + attributes = readAttributes(in); + } + + final long expectedNumBytes = readLong(in); // read length of payload + copy(in, out, expectedNumBytes); // read payload + + nextAttributes = readAttributes(in); + haveReadSomething = true; + + return attributes; + } + + protected String readString(final InputStream in) throws IOException { + final Integer numBytes = readFieldLength(in); + if (numBytes == null) { + throw new EOFException(); + } + final byte[] bytes = new byte[numBytes]; + fillBuffer(in, bytes, numBytes); + return new String(bytes, "UTF-8"); + } + + private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException { + int bytesRead; + int totalBytesRead = 0; + while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) { + totalBytesRead += bytesRead; + } + if (totalBytesRead != length) { + throw new EOFException(); + } + } + + protected long copy(final InputStream in, final OutputStream out, final long numBytes) throws IOException { + int bytesRead; + long totalBytesRead = 0L; + while ((bytesRead = in.read(readBuffer, 0, (int) Math.min(readBuffer.length, numBytes - totalBytesRead))) > 0) { + out.write(readBuffer, 0, bytesRead); + totalBytesRead += bytesRead; + } + + if (totalBytesRead < numBytes) { + throw new EOFException("Expected " + numBytes + " but received " + totalBytesRead); + } + + return totalBytesRead; + } + + protected long readLong(final InputStream in) throws IOException { + fillBuffer(in, readBuffer, 8); + return (((long) readBuffer[0] << 56) + + ((long) (readBuffer[1] & 255) << 48) + + ((long) (readBuffer[2] & 255) << 40) + + ((long) (readBuffer[3] & 255) << 32) + + ((long) (readBuffer[4] & 255) << 24) + + ((readBuffer[5] & 255) << 16) + + ((readBuffer[6] & 255) << 8) + + ((readBuffer[7] & 255))); + } + + private Integer readFieldLength(final InputStream in) throws IOException { + final int firstValue = in.read(); + final int secondValue = in.read(); + if (firstValue < 0) { + return null; + } + if (secondValue < 0) { + throw new EOFException(); + } + if (firstValue == 0xff && secondValue == 0xff) { + int ch1 = in.read(); + int ch2 = in.read(); + int ch3 = in.read(); + int ch4 = in.read(); + if ((ch1 | ch2 | ch3 | ch4) < 0) { + throw new EOFException(); + } + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4)); + } else { + return ((firstValue << 8) + (secondValue)); + } + } +}