http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java
 
b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java
new file mode 100644
index 0000000..131bcde
--- /dev/null
+++ 
b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java
@@ -0,0 +1,1176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.attribute.expression.language;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.nifi.attribute.expression.language.Query.Range;
+import org.apache.nifi.attribute.expression.language.evaluation.QueryResult;
+import 
org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
+import 
org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
+import org.apache.nifi.flowfile.FlowFile;
+import org.antlr.runtime.tree.Tree;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestQuery {
+
+    @Test
+    public void testCompilation() {
+        assertInvalid("${attr:uuid()}");
+        assertInvalid("${attr:indexOf(length())}");
+        assertValid("${UUID()}");
+        assertInvalid("${UUID():nextInt()}");
+        assertValid("${nextInt()}");
+        assertValid("${now():format('yyyy/MM/dd')}");
+        assertInvalid("${attr:times(3)}");
+        assertValid("${attr:toNumber():multiply(3)}");
+        assertValid("${hostname()}");
+        assertValid("${literal(3)}");
+        // left here because it's convenient for looking at the output
+        //System.out.println(Query.compile("").evaluate(null));
+    }
+
+    private void assertValid(final String query) {
+        try {
+            Query.compile(query);
+        } catch (final Exception e) {
+            e.printStackTrace();
+            Assert.fail("Expected query to be valid, but it failed to compile 
due to " + e);
+        }
+    }
+
+    private void assertInvalid(final String query) {
+        try {
+            Query.compile(query);
+            Assert.fail("Expected query to be invalid, but it did compile");
+        } catch (final Exception e) {
+        }
+    }
+
+    @Test
+    public void testIsValidExpression() {
+        Query.validateExpression("${abc:substring(${xyz:length()})}", false);
+        Query.isValidExpression("${now():format('yyyy-MM-dd')}");
+
+        try {
+            Query.validateExpression("$${attr}", false);
+            Assert.fail("invalid query validated");
+        } catch (final AttributeExpressionLanguageParsingException e) {
+        }
+
+        Query.validateExpression("$${attr}", true);
+
+        Query.validateExpression("${filename:startsWith('T8MTXBC')\n"
+            + ":or( ${filename:startsWith('C4QXABC')} )\n"
+            + ":or( ${filename:startsWith('U6CXEBC')} )"
+            + ":or( ${filename:startsWith('KYM3ABC')} )}", false);
+    }
+
+    @Test
+    public void testCompileEmbedded() {
+        final String expression = "${x:equals( ${y} )}";
+        final Query query = Query.compile(expression);
+        final Tree tree = query.getTree();
+        System.out.println(printTree(tree));
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("x", "x");
+        attributes.put("y", "x");
+        final String result = Query.evaluateExpressions(expression, 
attributes, null);
+        assertEquals("true", result);
+
+        Query.validateExpression(expression, false);
+    }
+
+    private String printTree(final Tree tree) {
+        final StringBuilder sb = new StringBuilder();
+        printTree(tree, 0, sb);
+
+        return sb.toString();
+    }
+
+    private void printTree(final Tree tree, final int spaces, final 
StringBuilder sb) {
+        for (int i = 0; i < spaces; i++) {
+            sb.append(" ");
+        }
+
+        if (tree.getText().trim().isEmpty()) {
+            sb.append(tree.toString()).append("\n");
+        } else {
+            sb.append(tree.getText()).append("\n");
+        }
+
+        for (int i = 0; i < tree.getChildCount(); i++) {
+            printTree(tree.getChild(i), spaces + 2, sb);
+        }
+    }
+
+    @Test
+    public void testEscape() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "My Value");
+        attributes.put("${xx}", "hello");
+
+        assertEquals("My Value", evaluateQueryForEscape("${attr}", 
attributes));
+        assertEquals("${attr}", evaluateQueryForEscape("$${attr}", 
attributes));
+        assertEquals("$My Value", evaluateQueryForEscape("$$${attr}", 
attributes));
+        assertEquals("$${attr}", evaluateQueryForEscape("$$$${attr}", 
attributes));
+        assertEquals("$$My Value", evaluateQueryForEscape("$$$$${attr}", 
attributes));
+    }
+
+    @Test
+    public void testWithBackSlashes() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("x", "C:\\test\\1.txt");
+        attributes.put("y", "y\ny");
+
+        final String query = "${x:substringAfterLast( '/' 
):substringAfterLast( '\\\\' )}";
+        verifyEquals(query, attributes, "1.txt");
+        attributes.put("x", "C:/test/1.txt");
+        verifyEquals(query, attributes, "1.txt");
+
+        verifyEquals("${y:equals('y\\ny')}", attributes, Boolean.TRUE);
+    }
+
+    @Test
+    public void testWithTicksOutside() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "My Value");
+
+        assertEquals(1, Query.extractExpressionRanges("\"${attr}").size());
+        assertEquals(1, Query.extractExpressionRanges("'${attr}").size());
+        assertEquals(1, Query.extractExpressionRanges("'${attr}'").size());
+        assertEquals(1, Query.extractExpressionRanges("${attr}").size());
+
+        assertEquals("'My Value'", Query.evaluateExpressions("'${attr}'", 
attributes, null));
+        assertEquals("'My Value", Query.evaluateExpressions("'${attr}", 
attributes, null));
+    }
+
+    @Test
+    @Ignore("Depends on TimeZone")
+    public void testDateToNumber() {
+        final Query query = Query.compile("${dateTime:toDate('yyyy/MM/dd 
HH:mm:ss.SSS'):toNumber()}");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("dateTime", "2013/11/18 10:22:27.678");
+
+        final QueryResult<?> result = query.evaluate(attributes);
+        assertEquals(ResultType.NUMBER, result.getResultType());
+        assertEquals(1384788147678L, result.getValue());
+    }
+
+    @Test
+    public void testAddOneDayToDate() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("dateTime", "2013/11/18 10:22:27.678");
+
+        verifyEquals("${dateTime:toDate('yyyy/MM/dd 
HH:mm:ss.SSS'):toNumber():plus(86400000):toDate():format('yyyy/MM/dd 
HH:mm:ss.SSS')}", attributes, "2013/11/19 10:22:27.678");
+        verifyEquals("${dateTime:toDate('yyyy/MM/dd 
HH:mm:ss.SSS'):plus(86400000):format('yyyy/MM/dd HH:mm:ss.SSS')}", attributes, 
"2013/11/19 10:22:27.678");
+    }
+
+    @Test
+    @Ignore("Requires specific locale")
+    public void implicitDateConversion() {
+        final Date date = new Date();
+        final Query query = Query.compile("${dateTime:format('yyyy/MM/dd 
HH:mm:ss.SSS')}");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("dateTime", date.toString());
+
+        // the date.toString() above will end up truncating the milliseconds. 
So remove millis from the Date before
+        // formatting it
+        final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd 
HH:mm:ss.SSS", Locale.US);
+        final long millis = date.getTime() % 1000L;
+        final Date roundedToNearestSecond = new Date(date.getTime() - millis);
+        final String formatted = sdf.format(roundedToNearestSecond);
+
+        final QueryResult<?> result = query.evaluate(attributes);
+        assertEquals(ResultType.STRING, result.getResultType());
+        assertEquals(formatted, result.getValue());
+    }
+
+    @Test
+    public void testEmbeddedExpressionsAndQuotes() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("x", "abc");
+        attributes.put("a", "abc");
+
+        verifyEquals("${x:equals(${a})}", attributes, true);
+
+        Query.validateExpression("${x:equals('${a}')}", false);
+        assertEquals("true", Query.evaluateExpressions("${x:equals('${a}')}", 
attributes, null));
+
+        Query.validateExpression("${x:equals(\"${a}\")}", false);
+        assertEquals("true", 
Query.evaluateExpressions("${x:equals(\"${a}\")}", attributes, null));
+    }
+
+    @Test
+    public void testJoin() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a.a", "a");
+        attributes.put("a.b", "b");
+        attributes.put("a.c", "c");
+        verifyEquals("${allAttributes( 'a.a', 'a.b', 'a.c' ):join(', ')}", 
attributes, "a, b, c");
+        verifyEquals("${x:join(', ')}", attributes, "");
+        verifyEquals("${a.a:join(', ')}", attributes, "a");
+        verifyEquals("${allAttributes( 'x', 'y' ):join(',')}", attributes, 
",");
+    }
+
+    @Test(expected = AttributeExpressionLanguageException.class)
+    public void testCannotCombineWithNonReducingFunction() {
+        Query.compileTree("${allAttributes( 'a.1' ):plus(1)}");
+    }
+
+    @Test
+    public void testIsEmpty() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a", "a");
+        attributes.put("b", "");
+        attributes.put("c", "        \n");
+
+        verifyEquals("${a:isEmpty()}", attributes, false);
+        verifyEquals("${b:isEmpty()}", attributes, true);
+        verifyEquals("${c:isEmpty()}", attributes, true);
+        verifyEquals("${d:isEmpty()}", attributes, true);
+    }
+
+    @Test
+    public void testReplaceEmpty() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a", "a");
+        attributes.put("b", "");
+        attributes.put("c", "        \n");
+
+        verifyEquals("${a:replaceEmpty('c')}", attributes, "a");
+        verifyEquals("${b:replaceEmpty('c')}", attributes, "c");
+        verifyEquals("${c:replaceEmpty('c')}", attributes, "c");
+        verifyEquals("${d:replaceEmpty('c')}", attributes, "c");
+    }
+
+    @Test
+    public void testCount() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a", "a");
+        attributes.put("b", "abc");
+        attributes.put("c", "        \n");
+        attributes.put("n1", "111");
+        attributes.put("n2", "222");
+        attributes.put("n3", "333333");
+
+        verifyEquals("${allMatchingAttributes( '.*' ):count()}", attributes, 
6L);
+        verifyEquals("${allMatchingAttributes( '.*' 
):length():gt(2):count()}", attributes, 5L);
+        verifyEquals("${allMatchingAttributes( 'n.*' ):plus(1):count()}", 
attributes, 3L);
+    }
+
+    @Test
+    public void testCurlyBracesInQuotes() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "My Valuee");
+
+        assertEquals("Val", evaluateQueryForEscape("${attr:replaceAll('My 
(Val)ue{1,2}', '$1')}", attributes));
+        assertEquals("Val", evaluateQueryForEscape("${attr:replaceAll(\"My 
(Val)ue{1,2}\", '$1')}", attributes));
+    }
+
+    private String evaluateQueryForEscape(final String queryString, final 
Map<String, String> attributes) {
+        final FlowFile mockFlowFile = Mockito.mock(FlowFile.class);
+        Mockito.when(mockFlowFile.getAttributes()).thenReturn(attributes);
+        Mockito.when(mockFlowFile.getId()).thenReturn(1L);
+        
Mockito.when(mockFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis());
+        Mockito.when(mockFlowFile.getSize()).thenReturn(1L);
+        Mockito.when(mockFlowFile.getLineageIdentifiers()).thenReturn(new 
HashSet<String>());
+        
Mockito.when(mockFlowFile.getLineageStartDate()).thenReturn(System.currentTimeMillis());
+        return Query.evaluateExpressions(queryString, mockFlowFile);
+    }
+
+    @Test
+    public void testGetAttributeValue() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "My Value");
+        verifyEquals("${attr}", attributes, "My Value");
+    }
+
+    @Test
+    public void testGetAttributeValueEmbedded() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "XX ");
+        attributes.put("XX", "My Value");
+        verifyEquals("${${attr:trim()}}", attributes, "My Value");
+    }
+
+    @Test
+    public void testSimpleSubstring() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "My Value");
+        verifyEquals("${attr:substring(2, 5)}", attributes, " Va");
+    }
+
+    @Test
+    public void testCallToFunctionWithSubjectResultOfAnotherFunctionCall() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "   My Value   ");
+        verifyEquals("${attr:trim():substring(2, 5)}", attributes, " Va");
+    }
+
+    @Test
+    public void testProblematic1() {
+        // There was a bug that prevented this expression from compiling. This 
test just verifies that it now compiles.
+        final String queryString = "${xx:append( \"120101\" ):toDate( 
'yyMMddHHmmss' ):format( \"yy-MM-dd’T’HH:mm:ss\") }";
+        Query.compile(queryString);
+    }
+
+    @Test
+    public void testEquals() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", " XX    ");
+        verifyEquals("${attr:trim():equals('XX')}", attributes, true);
+    }
+
+    @Test
+    public void testDeeplyEmbedded() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("x", "false");
+        attributes.put("abc", "a");
+        attributes.put("a", "a");
+
+        verifyEquals("${x:or( ${${abc}:length():equals(1)} )}", attributes, 
true);
+    }
+
+    @Test
+    public void testExtractExpressionRanges() {
+        assertEquals(29, Query.extractExpressionRanges("${hello:equals( 
$${goodbye} )}").get(0).getEnd());
+
+        List<Range> ranges = Query.extractExpressionRanges("hello");
+        assertTrue(ranges.isEmpty());
+
+        ranges = Query.extractExpressionRanges("${hello");
+        assertTrue(ranges.isEmpty());
+
+        ranges = Query.extractExpressionRanges("hello}");
+        assertTrue(ranges.isEmpty());
+
+        ranges = Query.extractExpressionRanges("$${hello");
+        assertTrue(ranges.isEmpty());
+
+        ranges = Query.extractExpressionRanges("$he{ll}o");
+        assertTrue(ranges.isEmpty());
+
+        ranges = Query.extractExpressionRanges("${hello}");
+        assertEquals(1, ranges.size());
+        Range range = ranges.get(0);
+        assertEquals(0, range.getStart());
+        assertEquals(7, range.getEnd());
+
+        ranges = Query.extractExpressionRanges("${hello:equals( ${goodbye} 
)}");
+        assertEquals(1, ranges.size());
+        range = ranges.get(0);
+        assertEquals(0, range.getStart());
+        assertEquals(28, range.getEnd());
+
+        ranges = Query.extractExpressionRanges("${hello:equals( $${goodbye} 
)}");
+        assertEquals(1, ranges.size());
+        range = ranges.get(0);
+        assertEquals(0, range.getStart());
+        assertEquals(29, range.getEnd());
+
+        ranges = Query.extractExpressionRanges("${hello:equals( $${goodbye} )} 
or just hi, ${bob:or(${jerry})}");
+        assertEquals(2, ranges.size());
+        range = ranges.get(0);
+        assertEquals(0, range.getStart());
+        assertEquals(29, range.getEnd());
+
+        range = ranges.get(1);
+        assertEquals(43, range.getStart());
+        assertEquals(61, range.getEnd());
+
+        ranges = Query.extractExpressionRanges("${hello:equals( ${goodbye} )} 
or just hi, ${bob}, are you ${bob.age:toNumber()} yet? $$$${bob}");
+        assertEquals(3, ranges.size());
+        range = ranges.get(0);
+        assertEquals(0, range.getStart());
+        assertEquals(28, range.getEnd());
+
+        range = ranges.get(1);
+        assertEquals(42, range.getStart());
+        assertEquals(47, range.getEnd());
+
+        range = ranges.get(2);
+        assertEquals(58, range.getStart());
+        assertEquals(78, range.getEnd());
+
+        ranges = Query.extractExpressionRanges("${x:matches( '.{4}' )}");
+        assertEquals(1, ranges.size());
+        range = ranges.get(0);
+        assertEquals(0, range.getStart());
+        assertEquals(21, range.getEnd());
+    }
+
+    @Test
+    public void testExtractExpressionTypes() {
+        final List<ResultType> types = 
Query.extractResultTypes("${hello:equals( ${goodbye} )} or just hi, ${bob}, are 
you ${bob.age:toNumber()} yet? $$$${bob}");
+        assertEquals(3, types.size());
+        assertEquals(ResultType.BOOLEAN, types.get(0));
+        assertEquals(ResultType.STRING, types.get(1));
+        assertEquals(ResultType.NUMBER, types.get(2));
+    }
+
+    @Test
+    public void testEqualsEmbedded() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("x", "hello");
+        attributes.put("y", "good-bye");
+
+        verifyEquals("${x:equals( ${y} )}", attributes, false);
+
+        attributes.put("y", "hello");
+        verifyEquals("${x:equals( ${y} )}", attributes, true);
+
+        attributes.put("x", "4");
+        attributes.put("y", "3");
+        attributes.put("z", "1");
+        attributes.put("h", "100");
+        verifyEquals("${x:toNumber():lt( ${y:toNumber():plus( ${h:toNumber()} 
)} )}", attributes, true);
+        verifyEquals("${h:toNumber():ge( ${y:toNumber():plus( ${z:toNumber()} 
)} )}", attributes, true);
+        verifyEquals("${x:toNumber():equals( ${y:toNumber():plus( 
${z:toNumber()} )} )}", attributes, true);
+
+        attributes.put("x", "88");
+        verifyEquals("${x:toNumber():gt( ${y:toNumber():plus( ${z:toNumber()} 
)} )}", attributes, true);
+
+        attributes.put("y", "88");
+        assertEquals("true", Query.evaluateExpressions("${x:equals( '${y}' 
)}", attributes, null));
+    }
+
+    @Test
+    public void testComplicatedEmbeddedExpressions() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("fox", "quick, brown");
+        attributes.put("dog", "lazy");
+
+        verifyEquals("${fox:substring( ${ 'dog' :substring(2):length()}, 5 
):equals( 'ick' )}", attributes, true);
+        verifyEquals("${fox:substring( ${ 'dog' :substring(2):length()}, 5 
):equals( 'ick' )}", attributes, true);
+    }
+
+    @Test
+    public void testQuotingQuotes() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("xx", "say 'hi'");
+
+        String query = "${xx:replaceAll( \"'.*'\", '\\\"hello\\\"' )}";
+        verifyEquals(query, attributes, "say \"hello\"");
+
+        query = "${xx:replace( \"'\", '\"')}";
+        verifyEquals(query, attributes, "say \"hi\"");
+
+        query = "${xx:replace( '\\'', '\"')}";
+        System.out.println(query);
+        verifyEquals(query, attributes, "say \"hi\"");
+    }
+
+    @Test
+    public void testDoubleQuotesWithinSingleQuotes() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("xx", "say 'hi'");
+
+        final String query = "${xx:replace( \"'hi'\", '\\\"hello\\\"' )}";
+        System.out.println(query);
+        verifyEquals(query, attributes, "say \"hello\"");
+    }
+
+    @Test
+    public void testEscapeQuotes() {
+        final long timestamp = 1403620278642L;
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("date", String.valueOf(timestamp));
+
+        final String format = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+
+        final String query = 
"startDateTime=\"${date:toNumber():toDate():format(\"" + format + "\")}\"";
+        final String result = Query.evaluateExpressions(query, attributes, 
null);
+
+        final String expectedTime = new SimpleDateFormat(format, 
Locale.US).format(timestamp);
+        assertEquals("startDateTime=\"" + expectedTime + "\"", result);
+
+        final List<Range> ranges = Query.extractExpressionRanges(query);
+        assertEquals(1, ranges.size());
+    }
+
+    @Test
+    public void testDateConversion() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("date", "1403620278642");
+
+        verifyEquals("${date:format('yyyy')}", attributes, "2014");
+        verifyEquals("${date:toDate():format('yyyy')}", attributes, "2014");
+        verifyEquals("${date:toNumber():format('yyyy')}", attributes, "2014");
+        verifyEquals("${date:toNumber():toDate():format('yyyy')}", attributes, 
"2014");
+        verifyEquals("${date:toDate():toNumber():format('yyyy')}", attributes, 
"2014");
+        
verifyEquals("${date:toDate():toNumber():toDate():toNumber():toDate():toNumber():format('yyyy')}",
 attributes, "2014");
+    }
+
+    @Test
+    public void testSingleLetterAttribute() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("A", "0123456789");
+
+        verifyEquals("${A}", attributes, "0123456789");
+        verifyEquals("${'A'}", attributes, "0123456789");
+    }
+
+    @Test
+    public void testImplicitConversions() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("A", "0123456789");
+        attributes.put("b", "true");
+        attributes.put("c", "false");
+        attributes.put("d", "Quick Brown Fox");
+        attributes.put("F", "-48");
+        attributes.put("n", "2014/04/04 00:00:00");
+
+        final Calendar cal = Calendar.getInstance();
+        cal.set(Calendar.YEAR, 2014);
+        cal.set(Calendar.MONTH, 3);
+        cal.set(Calendar.DAY_OF_MONTH, 4);
+        cal.set(Calendar.HOUR, 0);
+        cal.set(Calendar.MINUTE, 0);
+        cal.set(Calendar.SECOND, 45);
+
+        final String dateString = cal.getTime().toString();
+        attributes.put("z", dateString);
+
+        verifyEquals("${A:plus(4)}", attributes, 123456793L);
+        verifyEquals("${A:plus( ${F} )}", attributes, 123456741L);
+
+        verifyEquals("${F:lt( ${A} )}", attributes, true);
+        verifyEquals("${A:substring(2,3):plus(21):substring(1,2):plus(0)}", 
attributes, 3L);
+        verifyEquals("${n:format( 'yyyy' )}", attributes, "2014");
+        verifyEquals("${z:format( 'yyyy' )}", attributes, "2014");
+
+        attributes.put("n", "2014/04/04 00:00:00.045");
+        verifyEquals("${n:format( 'yyyy' ):append(','):append( ${n:format( 
'SSS' )} )}", attributes, "2014,045");
+    }
+
+    @Test
+    public void testNewLinesAndTabsInQuery() {
+        final String query = "${ abc:equals('abc'):or( \n\t${xx:isNull()}\n) 
}";
+        assertEquals(ResultType.BOOLEAN, Query.getResultType(query));
+        Query.validateExpression(query, false);
+        assertEquals("true", Query.evaluateExpressions(query));
+    }
+
+    @Test
+    public void testAttributeReferencesWithWhiteSpace() {
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("a b c,d", "abc");
+
+        final String query = "${ 'a b c,d':equals('abc') }";
+        verifyEquals(query, attrs, true);
+    }
+
+    @Test
+    public void testComments() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+
+        final String expression
+        = "# hello, world\n"
+            + "${# ref attr\n"
+            + "\t"
+            + "abc"
+            + "\t"
+            + "#end ref attr\n"
+            + "}";
+
+        Query query = Query.compile(expression);
+        QueryResult<?> result = query.evaluate(attributes);
+        assertEquals(ResultType.STRING, result.getResultType());
+        assertEquals("xyz", result.getValue());
+
+        query = Query.compile("${abc:append('# hello') #good-bye \n}");
+        result = query.evaluate(attributes);
+        assertEquals(ResultType.STRING, result.getResultType());
+        assertEquals("xyz# hello", result.getValue());
+    }
+
+    @Test
+    public void testAppendPrepend() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "XX");
+        attributes.put("YXXX", "bingo");
+
+        verifyEquals("${${attr:append('X'):prepend('Y')}}", attributes, 
"bingo");
+    }
+
+    @Test
+    public void testIsNull() {
+        final Map<String, String> attributes = new HashMap<>();
+        verifyEquals("${attr:isNull()}", attributes, true);
+    }
+
+    @Test
+    public void testNotNull() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "");
+
+        verifyEquals("${attr:notNull()}", attributes, true);
+    }
+
+    @Test
+    public void testIsNullOrLengthEquals0() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "");
+        attributes.put("xyz", "xyz");
+        attributes.put("xx", "  ");
+
+        verifyEquals("${abc:isNull():or( ${abc:length():equals(0)} )}", 
attributes, true);
+        verifyEquals("${xyz:isNull():or( ${xyz:length():equals(0)} )}", 
attributes, false);
+        verifyEquals("${none:isNull():or( ${none:length():equals(0)} )}", 
attributes, true);
+        verifyEquals("${xx:isNull():or( ${xx:trim():length():equals(0)} )}", 
attributes, true);
+    }
+
+    @Test
+    public void testReplaceNull() {
+        final Map<String, String> attributes = new HashMap<>();
+        verifyEquals("${attr:replaceNull('hello')}", attributes, "hello");
+    }
+
+    @Test
+    public void testReplace() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "hello");
+        verifyEquals("${attr:replace('hell', 'yell')}", attributes, "yello");
+    }
+
+    @Test
+    public void testReplaceAll() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "hello");
+        attributes.put("xyz", "00-00TEST.2014_01_01_000000_value");
+
+        
verifyEquals("${xyz:replaceAll(\"^([^.]+)\\.([0-9]{4})_([0-9]{2})_([0-9]{2}).*$\",
 \"$3\")}", attributes, "01");
+        verifyEquals("${attr:replaceAll('l+', 'r')}", attributes, "hero");
+
+        attributes.clear();
+        attributes.put("filename1", "abc.gz");
+        attributes.put("filename2", "abc.g");
+        attributes.put("filename3", "abc.gz.gz");
+        attributes.put("filename4", "abc.gz.g");
+        attributes.put("abc", "hello world");
+
+        verifyEquals("${filename3:replaceAll('\\\\\\.gz$', '')}", attributes, 
"abc.gz.gz");
+        verifyEquals("${filename3:replaceAll('\\\\\\\\.gz$', '')}", 
attributes, "abc.gz.gz");
+        verifyEquals("${filename1:replaceAll('\\.gz$', '')}", attributes, 
"abc");
+        verifyEquals("${filename2:replaceAll('\\.gz$', '')}", attributes, 
"abc.g");
+        verifyEquals("${filename4:replaceAll('\\\\.gz$', '')}", attributes, 
"abc.gz.g");
+
+        verifyEquals("${abc:replaceAll( 'lo wor(ld)', '$0')}", attributes, 
"hello world");
+        verifyEquals("${abc:replaceAll( 'he(llo) world', '$1')}", attributes, 
"llo");
+        verifyEquals("${abc:replaceAll( 'xx', '$0')}", attributes, "hello 
world");
+        verifyEquals("${abc:replaceAll( '(xx)', '$1')}", attributes, "hello 
world");
+        verifyEquals("${abc:replaceAll( 'lo wor(ld)', '$1')}", attributes, 
"helld");
+
+    }
+
+    @Test
+    public void testReplaceAllWithOddNumberOfBackslashPairs() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "C:\\temp\\.txt");
+
+        verifyEquals("${filename:replace('\\\\', '/')}", attributes, 
"C:/temp/.txt");
+        verifyEquals("${filename:replaceAll('\\\\\\\\', '/')}", attributes, 
"C:/temp/.txt");
+        verifyEquals("${filename:replaceAll('\\\\\\.txt$', '')}", attributes, 
"C:\\temp");
+    }
+
+    @Test
+    public void testReplaceAllWithMatchingGroup() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "hello");
+
+        verifyEquals("${attr:replaceAll('.*?(l+).*', '$1')}", attributes, 
"ll");
+    }
+
+    @Test
+    public void testMathOperations() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("one", "1");
+        attributes.put("two", "2");
+        attributes.put("three", "3");
+        attributes.put("four", "4");
+        attributes.put("five", "5");
+        attributes.put("hundred", "100");
+
+        
verifyEquals("${hundred:toNumber():multiply(2):divide(3):plus(1):mod(5)}", 
attributes, 2L);
+    }
+
+    @Test
+    public void testIndexOf() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attr", "https://abc.go";);
+        verifyEquals("${attr:indexOf('/')}", attributes, 6L);
+    }
+
+    @Test
+    public void testDate() {
+        final Calendar now = Calendar.getInstance();
+        final int year = now.get(Calendar.YEAR);
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("entryDate", String.valueOf(now.getTimeInMillis()));
+
+        verifyEquals("${entryDate:toNumber():toDate():format('yyyy')}", 
attributes, String.valueOf(year));
+
+        attributes.clear();
+        attributes.put("month", "3");
+        attributes.put("day", "4");
+        attributes.put("year", "2013");
+        assertEquals("63", 
Query.evaluateExpressions("${year:append('/'):append(${month}):append('/'):append(${day}):toDate('yyyy/MM/dd'):format('D')}",
 attributes, null));
+        assertEquals("63", 
Query.evaluateExpressions("${year:append('/'):append('${month}'):append('/'):append('${day}'):toDate('yyyy/MM/dd'):format('D')}",
 attributes, null));
+
+        
verifyEquals("${year:append('/'):append(${month}):append('/'):append(${day}):toDate('yyyy/MM/dd'):format('D')}",
 attributes, "63");
+    }
+
+    @Test
+    public void testSystemProperty() {
+        System.setProperty("hello", "good-bye");
+        assertEquals("good-bye", Query.evaluateExpressions("${hello}"));
+        assertEquals("good-bye", 
Query.compile("${hello}").evaluate().getValue());
+    }
+
+    @Test
+    public void testAnyAttribute() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "zzz");
+        attributes.put("xyz", "abc");
+
+        verifyEquals("${anyAttribute('abc', 'xyz', 
'missingAttr'):substring(1,2):equals('b')}", attributes, true);
+        verifyEquals("${anyAttribute('abc', 
'xyz'):substring(1,2):equals('b')}", attributes, true);
+        verifyEquals("${anyAttribute('xyz', 
'abc'):substring(1,2):equals('b')}", attributes, true);
+        verifyEquals("${anyAttribute('zz'):substring(1,2):equals('b')}", 
attributes, false);
+        verifyEquals("${anyAttribute('abc', 'zz'):isNull()}", attributes, 
true);
+    }
+
+    @Test
+    public void testAnyMatchingAttribute() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "zzz");
+        attributes.put("xyz", "abc");
+        attributes.put("123.cba", "hello");
+
+        verifyEquals("${anyMatchingAttribute('.{2}x', 
'.{2}z'):substring(1,2):equals('b')}", attributes, true);
+        
verifyEquals("${anyMatchingAttribute('.*'):substring(1,2):equals('b')}", 
attributes, true);
+        
verifyEquals("${anyMatchingAttribute('x{44}'):substring(1,2):equals('b')}", 
attributes, false);
+        
verifyEquals("${anyMatchingAttribute('abc'):substring(1,2):equals('b')}", 
attributes, false);
+        
verifyEquals("${anyMatchingAttribute('xyz'):substring(1,2):equals('b')}", 
attributes, true);
+        verifyEquals("${anyMatchingAttribute('xyz'):notNull()}", attributes, 
true);
+        verifyEquals("${anyMatchingAttribute('xyz'):isNull()}", attributes, 
false);
+        verifyEquals("${anyMatchingAttribute('xxxxxxxxx'):notNull()}", 
attributes, false);
+        verifyEquals("${anyMatchingAttribute('123\\.c.*'):matches('hello')}", 
attributes, true);
+        
verifyEquals("${anyMatchingAttribute('123\\.c.*|a.c'):matches('zzz')}", 
attributes, true);
+    }
+
+    @Test
+    public void testAnyDelineatedValue() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "a,b,c");
+        attributes.put("xyz", "abc");
+
+        final String query = "${anyDelineatedValue('${abc}', 
','):equals('b')}";
+        assertEquals(ResultType.BOOLEAN, Query.getResultType(query));
+
+        assertEquals("true", Query.evaluateExpressions(query, attributes, 
null));
+        assertEquals("true", 
Query.evaluateExpressions("${anyDelineatedValue('${abc}', ','):equals('a')}", 
attributes, null));
+        assertEquals("true", 
Query.evaluateExpressions("${anyDelineatedValue('${abc}', ','):equals('c')}", 
attributes, null));
+        assertEquals("false", 
Query.evaluateExpressions("${anyDelineatedValue('${abc}', ','):equals('d')}", 
attributes, null));
+
+        verifyEquals("${anyDelineatedValue(${abc}, ','):equals('b')}", 
attributes, true);
+        verifyEquals("${anyDelineatedValue(${abc}, ','):equals('a')}", 
attributes, true);
+        verifyEquals("${anyDelineatedValue(${abc}, ','):equals('c')}", 
attributes, true);
+        verifyEquals("${anyDelineatedValue(${abc}, ','):equals('d')}", 
attributes, false);
+    }
+
+    @Test
+    public void testAllDelineatedValues() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "a,b,c");
+        attributes.put("xyz", "abc");
+
+        final String query = "${allDelineatedValues('${abc}', 
','):matches('[abc]')}";
+
+        assertEquals(ResultType.BOOLEAN, Query.getResultType(query));
+        assertEquals("true", Query.evaluateExpressions(query, attributes, 
null));
+        assertEquals("true", Query.evaluateExpressions(query, attributes, 
null));
+        assertEquals("false", 
Query.evaluateExpressions("${allDelineatedValues('${abc}', 
','):matches('[abd]')}", attributes, null));
+        assertEquals("false", 
Query.evaluateExpressions("${allDelineatedValues('${abc}', 
','):equals('a'):not()}", attributes, null));
+
+        verifyEquals("${allDelineatedValues(${abc}, ','):matches('[abc]')}", 
attributes, true);
+        verifyEquals("${allDelineatedValues(${abc}, ','):matches('[abd]')}", 
attributes, false);
+        verifyEquals("${allDelineatedValues(${abc}, ','):equals('a'):not()}", 
attributes, false);
+    }
+
+    @Test
+    public void testAllAttributes() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "1234");
+        attributes.put("xyz", "4132");
+        attributes.put("hello", "world!");
+
+        verifyEquals("${allAttributes('abc', 'xyz'):matches('\\d+')}", 
attributes, true);
+        verifyEquals("${allAttributes('abc', 'xyz'):toNumber():lt(99999)}", 
attributes, true);
+        verifyEquals("${allAttributes('abc', 'hello'):length():gt(3)}", 
attributes, true);
+        verifyEquals("${allAttributes('abc', 'hello'):length():equals(4)}", 
attributes, false);
+        verifyEquals("${allAttributes('abc', 'xyz'):length():equals(4)}", 
attributes, true);
+        verifyEquals("${allAttributes('abc', 'xyz', 'other'):isNull()}", 
attributes, false);
+
+        try {
+            Query.compile("${allAttributes('#ah'):equals('hello')");
+            Assert.fail("Was able to compile with allAttributes and an invalid 
attribute name");
+        } catch (final AttributeExpressionLanguageParsingException e) {
+            // expected behavior
+        }
+    }
+
+    @Test
+    public void testMathOperators() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "1234");
+        attributes.put("xyz", "4132");
+        attributes.put("hello", "world!");
+
+        verifyEquals("${xyz:toNumber():gt( ${abc:toNumber()} )}", attributes, 
true);
+    }
+
+    @Test
+    public void testAllMatchingAttributes() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "1234");
+        attributes.put("xyz", "4132");
+        attributes.put("hello", "world!");
+        attributes.put("123.cba", "hell.o");
+
+        
System.out.println(printTree(Query.compile("${allMatchingAttributes('(abc|xyz)'):matches('\\\\d+')}").getTree()));
+
+        verifyEquals("${'123.cba':matches('hell\\.o')}", attributes, true);
+        verifyEquals("${allMatchingAttributes('123\\.cba'):equals('hell.o')}", 
attributes, true);
+        verifyEquals("${allMatchingAttributes('(abc|xyz)'):matches('\\d+')}", 
attributes, true);
+        
verifyEquals("${allMatchingAttributes('[ax].*'):toNumber():lt(99999)}", 
attributes, true);
+        verifyEquals("${allMatchingAttributes('hell.'):length():gt(3)}", 
attributes, true);
+
+        verifyEquals("${allMatchingAttributes('123\\.cba'):equals('no')}", 
attributes, false);
+    }
+
+    @Test
+    public void testMatches() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "1234xyz4321");
+        attributes.put("end", "xyz");
+        attributes.put("xyz", "4132");
+        attributes.put("hello", "world!");
+        attributes.put("dotted", "abc.xyz");
+
+        final String evaluated = 
Query.evaluateExpressions("${abc:matches('1234${end}4321')}", attributes, null);
+        assertEquals("true", evaluated);
+
+        attributes.put("end", "888");
+        final String secondEvaluation = 
Query.evaluateExpressions("${abc:matches('1234${end}4321')}", attributes, null);
+        assertEquals("false", secondEvaluation);
+
+        verifyEquals("${dotted:matches('abc\\.xyz')}", attributes, true);
+    }
+
+    @Test
+    public void testFind() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "1234xyz4321");
+        attributes.put("end", "xyz");
+        attributes.put("xyz", "4132");
+        attributes.put("hello", "world!");
+        attributes.put("dotted", "abc.xyz");
+
+        final String evaluated = 
Query.evaluateExpressions("${abc:find('1234${end}4321')}", attributes, null);
+        assertEquals("true", evaluated);
+
+        attributes.put("end", "888");
+        final String secondEvaluation = 
Query.evaluateExpressions("${abc:find('${end}4321')}", attributes, null);
+        assertEquals("false", secondEvaluation);
+
+        verifyEquals("${dotted:find('\\.')}", attributes, true);
+    }
+
+    @Test
+    public void testSubstringAfter() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file-255");
+
+        verifyEquals("${filename:substringAfter('')}", attributes, "file-255");
+        verifyEquals("${filename:substringAfterLast('')}", attributes, 
"file-255");
+        verifyEquals("${filename:substringBefore('')}", attributes, 
"file-255");
+        verifyEquals("${filename:substringBeforeLast('')}", attributes, 
"file-255");
+        verifyEquals("${filename:substringBefore('file')}", attributes, "");
+
+        attributes.put("uri", "sftp://some.uri";);
+        verifyEquals("${uri:substringAfter('sftp')}", attributes, 
"://some.uri");
+    }
+
+    @Test
+    public void testSubstringAfterLast() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file-file-255");
+
+        verifyEquals("${filename:substringAfterLast('file-')}", attributes, 
"255");
+        verifyEquals("${filename:substringAfterLast('5')}", attributes, "");
+        verifyEquals("${filename:substringAfterLast('x')}", attributes, 
"file-file-255");
+    }
+
+    @Test
+    public void testSubstringBefore() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("something", "some {} or other");
+
+        verifyEquals("${something:substringBefore('}')}", attributes, "some 
{");
+    }
+
+    @Test
+    public void testSubstring() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file-255");
+
+        verifyEquals("${filename:substring(1, 2)}", attributes, "i");
+        verifyEquals("${filename:substring(4)}", attributes, "-255");
+    }
+
+    @Test
+    public void testToRadix() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file-255");
+        attributes.put("filename2", "file-99999");
+
+        
verifyEquals("${filename:substringAfter('-'):toNumber():toRadix(16):toUpper()}",
 attributes, "FF");
+        verifyEquals("${filename:substringAfter('-'):toNumber():toRadix(16, 
4):toUpper()}", attributes, "00FF");
+        verifyEquals("${filename:substringAfter('-'):toNumber():toRadix(36, 
3):toUpper()}", attributes, "073");
+    }
+
+    @Test
+    public void testDateFormatConversion() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("blue", "20130917162643");
+        verifyEquals("${blue:toDate('yyyyMMddHHmmss'):format(\"yyyy/MM/dd 
HH:mm:ss.SSS'Z'\")}", attributes, "2013/09/17 16:26:43.000Z");
+    }
+
+    @Test
+    public void testNot() {
+        verifyEquals("${ab:notNull():not()}", new HashMap<String, String>(), 
true);
+    }
+
+    @Test
+    public void testAttributesWithSpaces() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("ab", "abc");
+        attributes.put("a  b", "abc");
+
+        verifyEquals("${ab}", attributes, "abc");
+        verifyEquals("${'a  b'}", attributes, "abc");
+        verifyEquals("${'a b':replaceNull('')}", attributes, "");
+    }
+
+    @Test
+    public void testOr() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename1", "xabc");
+        attributes.put("filename2", "yabc");
+        attributes.put("filename3", "abcxy");
+
+        verifyEquals("${filename1:startsWith('x'):or(true)}", attributes, 
true);
+        verifyEquals("${filename1:startsWith('x'):or( 
${filename1:startsWith('y')} )}", attributes, true);
+        verifyEquals("${filename2:startsWith('x'):or( 
${filename2:startsWith('y')} )}", attributes, true);
+        verifyEquals("${filename3:startsWith('x'):or( 
${filename3:startsWith('y')} )}", attributes, false);
+        verifyEquals("${filename1:startsWith('x'):or( 
${filename2:startsWith('y')} )}", attributes, true);
+        verifyEquals("${filename2:startsWith('x'):or( 
${filename1:startsWith('y')} )}", attributes, false);
+    }
+
+    @Test
+    public void testAnd() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename1", "xabc");
+        attributes.put("filename2", "yabc");
+        attributes.put("filename 3", "abcxy");
+
+        verifyEquals("${filename1:startsWith('x'):and(true)}", attributes, 
true);
+        verifyEquals("${filename1:startsWith('x') : and( false )}", 
attributes, false);
+        verifyEquals("${filename1:startsWith('x'):and( 
${filename1:startsWith('y')} )}", attributes, false);
+        verifyEquals("${filename2:startsWith('x'):and( 
${filename2:startsWith('y')} )}", attributes, false);
+        verifyEquals("${filename3:startsWith('x'):and( 
${filename3:startsWith('y')} )}", attributes, false);
+        verifyEquals("${filename1:startsWith('x'):and( 
${filename2:startsWith('y')} )}", attributes, true);
+        verifyEquals("${filename2:startsWith('x'):and( 
${filename1:startsWith('y')} )}", attributes, false);
+        verifyEquals("${filename1:startsWith('x'):and( ${'filename 
3':endsWith('y')} )}", attributes, true);
+    }
+
+    @Test
+    public void testAndOrNot() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename1", "xabc");
+        attributes.put("filename2", "yabc");
+        attributes.put("filename 3", "abcxy");
+
+        final String query
+        = "${"
+            + "     'non-existing':notNull():not():and(" + // true AND (
+            "     ${filename1:startsWith('y')" + // false
+            "     :or(" + // or
+            "       ${ filename1:startsWith('x'):and(false) }" + // false
+            "     ):or(" + // or
+            "       ${ filename2:endsWith('xxxx'):or( ${'filename 
3':length():gt(1)} ) }" + // true )
+            "     )}"
+            + "     )"
+            + "}";
+
+        System.out.println(query);
+        verifyEquals(query, attributes, true);
+    }
+
+    @Test
+    public void testAndOrLogicWithAnyAll() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename1", "xabc");
+        attributes.put("filename2", "yabc");
+        attributes.put("filename 3", "abcxy");
+
+        
verifyEquals("${anyMatchingAttribute('filename.*'):contains('abc'):and( 
${filename2:equals('yabc')} )}", attributes, true);
+        
verifyEquals("${anyMatchingAttribute('filename.*'):contains('abc'):and( 
${filename2:equals('xabc')} )}", attributes, false);
+        
verifyEquals("${anyMatchingAttribute('filename.*'):contains('abc'):not():or( 
${filename2:equals('yabc')} )}", attributes, true);
+        
verifyEquals("${anyMatchingAttribute('filename.*'):contains('abc'):not():or( 
${filename2:equals('xabc')} )}", attributes, false);
+    }
+
+    @Test
+    public void testKeywords() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("UUID", "123");
+        verifyEquals("${ 'UUID':toNumber():equals(123) }", attributes, true);
+    }
+
+    @Test
+    public void testEqualsNumber() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "123");
+        verifyEquals("${ abc:toNumber():equals(123) }", attributes, true);
+    }
+
+    @Test
+    public void testSubjectAsEmbeddedExpressionWithSurroundChars() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("b", "x");
+        attributes.put("abcxcba", "hello");
+
+        final String evaluated = Query.evaluateExpressions("${ 
'abc${b}cba':substring(0, 1) }", attributes, null);
+        assertEquals("h", evaluated);
+    }
+
+    @Test
+    public void testToNumberFunctionReturnsNumberType() {
+        assertEquals(ResultType.NUMBER, 
Query.getResultType("${header.size:toNumber()}"));
+    }
+
+    @Test
+    public void testAnyAttributeEmbedded() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a1", "test1");
+        attributes.put("b2", "2test");
+        attributes.put("c3", "3test3");
+
+        final String query = "${a1:equals('test1'):and( 
${anyAttribute('a1','b2','c3'):contains('2')})}";
+        verifyEquals(query, attributes, true);
+    }
+
+    @Test
+    public void testEvaluateWithinCurlyBraces() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+
+        final String query = "{ ${abc} }";
+        final List<String> expressions = Query.extractExpressions(query);
+        assertEquals(1, expressions.size());
+        assertEquals("${abc}", expressions.get(0));
+        assertEquals("{ xyz }", Query.evaluateExpressions(query, attributes));
+    }
+
+    @Test
+    public void testLiteralFunction() {
+        final Map<String, String> attrs = Collections.<String, String> 
emptyMap();
+        verifyEquals("${literal(2):gt(1)}", attrs, true);
+        verifyEquals("${literal('hello'):substring(0, 1):equals('h')}", attrs, 
true);
+    }
+
+    @Test
+    public void testFunctionAfterReduce() {
+        // Cannot call gt(2) after count() because count() is a 'reducing 
function'
+        // and must be the last function in an expression.
+        
assertFalse(Query.isValidExpression("${allMatchingAttributes('a.*'):contains('2'):count():gt(2)}"));
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a.1", "245");
+        attributes.put("a.2", "123");
+        attributes.put("a.3", "732");
+        attributes.put("a.4", "343");
+        attributes.put("a.5", "553");
+
+        final String endsWithCount = 
"${allMatchingAttributes('a.*'):contains('2'):count()}";
+        assertTrue(Query.isValidExpression(endsWithCount));
+        verifyEquals(endsWithCount, attributes, 3L);
+
+        // in order to check if value is greater than 2, need to first 
evaluate the
+        // 'aggregate' and 'reducing' functions as an inner expression. Then 
we can
+        // use the literal() function to make the result of the inner 
expression the subject
+        // of the function gt()
+        final String usingLiteral = "${literal(" + endsWithCount + "):gt(2)}";
+        assertTrue(Query.isValidExpression(usingLiteral));
+        verifyEquals(usingLiteral, attributes, true);
+
+        attributes.clear();
+        attributes.put("a1", "123");
+        attributes.put("a2", "321");
+        verifyEquals("${allMatchingAttributes('a.*'):contains('2')}", 
attributes, true);
+        
verifyEquals("${allMatchingAttributes('a.*'):contains('2'):toUpper():equals('TRUE')}",
 attributes, true);
+        
verifyEquals("${allMatchingAttributes('a.*'):contains('2'):equals('true'):and( 
${literal(true)} )}", attributes, true);
+    }
+
+    private void verifyEquals(final String expression, final Map<String, 
String> attributes, final Object expectedResult) {
+        Query.validateExpression(expression, false);
+        assertEquals(String.valueOf(expectedResult), 
Query.evaluateExpressions(expression, attributes, null));
+
+        final Query query = Query.compile(expression);
+        final QueryResult<?> result = query.evaluate(attributes);
+
+        if (expectedResult instanceof Number) {
+            assertEquals(ResultType.NUMBER, result.getResultType());
+        } else if (expectedResult instanceof Boolean) {
+            assertEquals(ResultType.BOOLEAN, result.getResultType());
+        } else {
+            assertEquals(ResultType.STRING, result.getResultType());
+        }
+
+        assertEquals(expectedResult, result.getValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java
 
b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java
new file mode 100644
index 0000000..5acba8d
--- /dev/null
+++ 
b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.attribute.expression.language;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestStandardPreparedQuery {
+
+    @Test
+    public void testSimpleReference() {
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("xx", "world");
+
+        assertEquals("world", evaluate("${xx}", attrs));
+        assertEquals("hello, world!", evaluate("hello, ${xx}!", attrs));
+    }
+
+    @Test
+    public void testEmbeddedReference() {
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("xx", "yy");
+        attrs.put("yy", "world");
+
+        assertEquals("world", evaluate("${${xx}}", attrs));
+    }
+
+    @Test
+    public void test10MIterations() {
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("xx", "world");
+
+        final StandardPreparedQuery prepared = (StandardPreparedQuery) 
Query.prepare("${xx}");
+        final long start = System.nanoTime();
+        for (int i = 0; i < 10000000; i++) {
+            assertEquals("world", prepared.evaluateExpressions(attrs, null));
+        }
+        final long nanos = System.nanoTime() - start;
+        System.out.println(TimeUnit.NANOSECONDS.toMillis(nanos));
+    }
+
+    @Test
+    @Ignore("Takes too long")
+    public void test10MIterationsWithQuery() {
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("xx", "world");
+
+        final long start = System.nanoTime();
+        for (int i = 0; i < 10000000; i++) {
+            assertEquals("world", Query.evaluateExpressions("${xx}", attrs));
+        }
+        final long nanos = System.nanoTime() - start;
+        System.out.println(TimeUnit.NANOSECONDS.toMillis(nanos));
+
+    }
+
+    @Test
+    public void testSeveralSequentialExpressions() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("audience", "World");
+        attributes.put("comma", ",");
+        attributes.put("question", " how are you?");
+        assertEquals("Hello, World, how are you?!", evaluate("Hello, 
${audience}${comma}${question}!", attributes));
+
+    }
+
+    private String evaluate(final String query, final Map<String, String> 
attrs) {
+        final String evaluated = ((StandardPreparedQuery) 
Query.prepare(query)).evaluateExpressions(attrs, null);
+        return evaluated;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-flowfile-packager/pom.xml 
b/nifi-commons/nifi-flowfile-packager/pom.xml
new file mode 100644
index 0000000..7173f03
--- /dev/null
+++ b/nifi-commons/nifi-flowfile-packager/pom.xml
@@ -0,0 +1,34 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-flowfile-packager</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
new file mode 100644
index 0000000..ae16f99
--- /dev/null
+++ 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+public interface FlowFilePackager {
+
+    void packageFlowFile(InputStream in, OutputStream out, Map<String, String> 
attributes, long fileSize) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
new file mode 100644
index 0000000..479ac58
--- /dev/null
+++ 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.lang3.StringEscapeUtils;
+
+public class FlowFilePackagerV1 implements FlowFilePackager {
+
+    public static final String FILENAME_ATTRIBUTES = "flowfile.attributes";
+    public static final String FILENAME_CONTENT = "flowfile.content";
+    public static final int DEFAULT_TAR_PERMISSIONS = 0644;
+
+    private final int tarPermissions;
+
+    public FlowFilePackagerV1() {
+        this(DEFAULT_TAR_PERMISSIONS);
+    }
+
+    public FlowFilePackagerV1(final int tarPermissions) {
+        this.tarPermissions = tarPermissions;
+    }
+
+    @Override
+    public void packageFlowFile(final InputStream in, final OutputStream out, 
final Map<String, String> attributes, final long fileSize) throws IOException {
+        try (final TarArchiveOutputStream tout = new 
TarArchiveOutputStream(out)) {
+            writeAttributesEntry(attributes, tout);
+            writeContentEntry(tout, in, fileSize);
+            tout.finish();
+            tout.flush();
+            tout.close();
+        }
+    }
+
+    private void writeAttributesEntry(final Map<String, String> attributes, 
final TarArchiveOutputStream tout) throws IOException {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?><!DOCTYPE 
properties\n  SYSTEM \"http://java.sun.com/dtd/properties.dtd\";>\n");
+        sb.append("<properties>");
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+            final String escapedKey = 
StringEscapeUtils.escapeXml11(entry.getKey());
+            final String escapedValue = 
StringEscapeUtils.escapeXml11(entry.getValue());
+            sb.append("\n  <entry 
key=\"").append(escapedKey).append("\">").append(escapedValue).append("</entry>");
+        }
+        sb.append("</properties>");
+
+        final byte[] metaBytes = 
sb.toString().getBytes(StandardCharsets.UTF_8);
+        final TarArchiveEntry attribEntry = new 
TarArchiveEntry(FILENAME_ATTRIBUTES);
+        attribEntry.setMode(tarPermissions);
+        attribEntry.setSize(metaBytes.length);
+        tout.putArchiveEntry(attribEntry);
+        tout.write(metaBytes);
+        tout.closeArchiveEntry();
+    }
+
+    private void writeContentEntry(final TarArchiveOutputStream tarOut, final 
InputStream inStream, final long fileSize) throws IOException {
+        final TarArchiveEntry entry = new TarArchiveEntry(FILENAME_CONTENT);
+        entry.setMode(tarPermissions);
+        entry.setSize(fileSize);
+        tarOut.putArchiveEntry(entry);
+        final byte[] buffer = new byte[512 << 10];//512KB
+        int bytesRead = 0;
+        while ((bytesRead = inStream.read(buffer)) != -1) { //still more data 
to read
+            if (bytesRead > 0) {
+                tarOut.write(buffer, 0, bytesRead);
+            }
+        }
+
+        copy(inStream, tarOut);
+        tarOut.closeArchiveEntry();
+    }
+
+    public static long copy(final InputStream source, final OutputStream 
destination) throws IOException {
+        final byte[] buffer = new byte[8192];
+        int len;
+        long totalCount = 0L;
+        while ((len = source.read(buffer)) > 0) {
+            destination.write(buffer, 0, len);
+            totalCount += len;
+        }
+        return totalCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
new file mode 100644
index 0000000..6f9d6b1
--- /dev/null
+++ 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * <p>
+ * Packages a FlowFile, including both its content and its attributes into a
+ * single file that is stream-friendly. The encoding scheme is as such:
+ * </p>
+ *
+ * <pre>
+ * Length Field : indicates the number of Flow File Attributes in the stream
+ * 1 to N times (N=number of Flow File Attributes):
+ *      String Field : Flow File Attribute key name
+ *      String Field : Flow File Attribute value
+ * Long : 8 bytes indicating the length of the Flow File content
+ * Content : The next M bytes are the content of the Flow File.
+ * </pre>
+ *
+ * <pre>
+ * Encoding of String Field is as follows:
+ *      Length Field : indicates the length of the String
+ *      1 to N bytes (N=String length, determined by previous field, as 
described above) : The UTF-8 encoded string value.
+ * </pre>
+ *
+ * <pre>
+ * Encoding of Length Field is as follows:
+ *      First 2 bytes: Indicate length. If both bytes = 255, this is a special 
value indicating that the length is
+ *                     greater than or equal to 65536 bytes; therefore, the 
next 4 bytes will indicate the actual length.
+ * </pre>
+ *
+ * <p>
+ * Note: All byte-order encoding is Network Byte Order (Most Significant Byte
+ * first)
+ * </p>
+ *
+ * <p>
+ * The following example shows the bytes expected if we were to encode a
+ * FlowFile containing the following attributes where the content is the text
+ * "Hello World!":
+ *
+ * <br><br>
+ * Attributes:
+ * <pre>
+ * +-------+-------+
+ * | Key   + Value |
+ * + --------------+
+ * | A     | a     |
+ * + --------------+
+ * | B     | b     |
+ * + --------------+
+ * </pre> Content:<br>
+ * Hello World!
+ * <br><br>
+ * Packaged Byte Encoding (In Hexadecimal Form):
+ * <p>
+ *
+ * <pre>
+ * 00 02 00 01 41 00 01 61
+ * 00 01 42 00 01 62 00 00
+ * 00 00 00 00 00 0C 48 65
+ * 6C 6C 6F 20 57 6F 72 6C
+ * 64 21
+ * </pre>
+ */
+public class FlowFilePackagerV2 implements FlowFilePackager {
+
+    private static final int MAX_VALUE_2_BYTES = 65535;
+    private final byte[] writeBuffer = new byte[8];
+
+    @Override
+    public void packageFlowFile(final InputStream in, final OutputStream out, 
final Map<String, String> attributes, final long fileSize) throws IOException {
+        writeFieldLength(out, attributes.size()); //write out the number of 
attributes
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) { 
//write out each attribute key/value pair
+            writeString(entry.getKey(), out);
+            writeString(entry.getValue(), out);
+        }
+        writeLong(out, fileSize);//write out length of data
+        copy(in, out);//write out the actual flow file payload
+    }
+
+    private void copy(final InputStream in, final OutputStream out) throws 
IOException {
+        final byte[] buffer = new byte[65536];
+        int len;
+        while ((len = in.read(buffer)) > 0) {
+            out.write(buffer, 0, len);
+        }
+    }
+
+    private void writeString(final String val, final OutputStream out) throws 
IOException {
+        final byte[] bytes = val.getBytes("UTF-8");
+        writeFieldLength(out, bytes.length);
+        out.write(bytes);
+    }
+
+    private void writeFieldLength(final OutputStream out, final int numBytes) 
throws IOException {
+        // If the value is less than the max value that can be fit into 2 
bytes, just use the
+        // actual value. Otherwise, we will set the first 2 bytes to 255/255 
and then use the next
+        // 4 bytes to indicate the real length.
+        if (numBytes < MAX_VALUE_2_BYTES) {
+            writeBuffer[0] = (byte) (numBytes >>> 8);
+            writeBuffer[1] = (byte) (numBytes);
+            out.write(writeBuffer, 0, 2);
+        } else {
+            writeBuffer[0] = (byte) 0xff;
+            writeBuffer[1] = (byte) 0xff;
+            writeBuffer[2] = (byte) (numBytes >>> 24);
+            writeBuffer[3] = (byte) (numBytes >>> 16);
+            writeBuffer[4] = (byte) (numBytes >>> 8);
+            writeBuffer[5] = (byte) (numBytes);
+            out.write(writeBuffer, 0, 6);
+        }
+    }
+
+    private void writeLong(final OutputStream out, final long val) throws 
IOException {
+        writeBuffer[0] = (byte) (val >>> 56);
+        writeBuffer[1] = (byte) (val >>> 48);
+        writeBuffer[2] = (byte) (val >>> 40);
+        writeBuffer[3] = (byte) (val >>> 32);
+        writeBuffer[4] = (byte) (val >>> 24);
+        writeBuffer[5] = (byte) (val >>> 16);
+        writeBuffer[6] = (byte) (val >>> 8);
+        writeBuffer[7] = (byte) (val);
+        out.write(writeBuffer, 0, 8);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
new file mode 100644
index 0000000..181f3e3
--- /dev/null
+++ 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+public class FlowFilePackagerV3 implements FlowFilePackager {
+
+    public static final byte[] MAGIC_HEADER = {'N', 'i', 'F', 'i', 'F', 'F', 
'3'};
+    private static final int MAX_VALUE_2_BYTES = 65535;
+    private final byte[] writeBuffer = new byte[8];
+
+    @Override
+    public void packageFlowFile(final InputStream in, final OutputStream out, 
final Map<String, String> attributes, final long fileSize) throws IOException {
+        out.write(MAGIC_HEADER);
+
+        if (attributes == null) {
+            writeFieldLength(out, 0);
+        } else {
+            writeFieldLength(out, attributes.size()); //write out the number 
of attributes
+            for (final Map.Entry<String, String> entry : 
attributes.entrySet()) { //write out each attribute key/value pair
+                writeString(entry.getKey(), out);
+                writeString(entry.getValue(), out);
+            }
+        }
+
+        writeLong(out, fileSize);//write out length of data
+        copy(in, out);//write out the actual flow file payload
+    }
+
+    private void copy(final InputStream in, final OutputStream out) throws 
IOException {
+        final byte[] buffer = new byte[65536];
+        int len;
+        while ((len = in.read(buffer)) > 0) {
+            out.write(buffer, 0, len);
+        }
+    }
+
+    private void writeString(final String val, final OutputStream out) throws 
IOException {
+        final byte[] bytes = val.getBytes("UTF-8");
+        writeFieldLength(out, bytes.length);
+        out.write(bytes);
+    }
+
+    private void writeFieldLength(final OutputStream out, final int numBytes) 
throws IOException {
+        // If the value is less than the max value that can be fit into 2 
bytes, just use the
+        // actual value. Otherwise, we will set the first 2 bytes to 255/255 
and then use the next
+        // 4 bytes to indicate the real length.
+        if (numBytes < MAX_VALUE_2_BYTES) {
+            writeBuffer[0] = (byte) (numBytes >>> 8);
+            writeBuffer[1] = (byte) (numBytes);
+            out.write(writeBuffer, 0, 2);
+        } else {
+            writeBuffer[0] = (byte) 0xff;
+            writeBuffer[1] = (byte) 0xff;
+            writeBuffer[2] = (byte) (numBytes >>> 24);
+            writeBuffer[3] = (byte) (numBytes >>> 16);
+            writeBuffer[4] = (byte) (numBytes >>> 8);
+            writeBuffer[5] = (byte) (numBytes);
+            out.write(writeBuffer, 0, 6);
+        }
+    }
+
+    private void writeLong(final OutputStream out, final long val) throws 
IOException {
+        writeBuffer[0] = (byte) (val >>> 56);
+        writeBuffer[1] = (byte) (val >>> 48);
+        writeBuffer[2] = (byte) (val >>> 40);
+        writeBuffer[3] = (byte) (val >>> 32);
+        writeBuffer[4] = (byte) (val >>> 24);
+        writeBuffer[5] = (byte) (val >>> 16);
+        writeBuffer[6] = (byte) (val >>> 8);
+        writeBuffer[7] = (byte) (val);
+        out.write(writeBuffer, 0, 8);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
new file mode 100644
index 0000000..fd9d92d
--- /dev/null
+++ 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+public interface FlowFileUnpackager {
+
+    Map<String, String> unpackageFlowFile(InputStream in, OutputStream out) 
throws IOException;
+
+    boolean hasMoreData() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
new file mode 100644
index 0000000..b96534a
--- /dev/null
+++ 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+
+public class FlowFileUnpackagerV1 implements FlowFileUnpackager {
+
+    private int flowFilesRead = 0;
+
+    @Override
+    public Map<String, String> unpackageFlowFile(final InputStream in, final 
OutputStream out) throws IOException {
+        flowFilesRead++;
+        final TarArchiveInputStream tarIn = new TarArchiveInputStream(in);
+        final TarArchiveEntry attribEntry = tarIn.getNextTarEntry();
+        if (attribEntry == null) {
+            return null;
+        }
+
+        final Map<String, String> attributes;
+        if 
(attribEntry.getName().equals(FlowFilePackagerV1.FILENAME_ATTRIBUTES)) {
+            attributes = getAttributes(tarIn);
+        } else {
+            throw new IOException("Expected two tar entries: "
+                    + FlowFilePackagerV1.FILENAME_CONTENT + " and "
+                    + FlowFilePackagerV1.FILENAME_ATTRIBUTES);
+        }
+
+        final TarArchiveEntry contentEntry = tarIn.getNextTarEntry();
+
+        if (contentEntry != null && 
contentEntry.getName().equals(FlowFilePackagerV1.FILENAME_CONTENT)) {
+            final byte[] buffer = new byte[512 << 10];//512KB
+            int bytesRead = 0;
+            while ((bytesRead = tarIn.read(buffer)) != -1) { //still more data 
to read
+                if (bytesRead > 0) {
+                    out.write(buffer, 0, bytesRead);
+                }
+            }
+            out.flush();
+        } else {
+            throw new IOException("Expected two tar entries: "
+                    + FlowFilePackagerV1.FILENAME_CONTENT + " and "
+                    + FlowFilePackagerV1.FILENAME_ATTRIBUTES);
+        }
+
+        return attributes;
+    }
+
+    protected Map<String, String> getAttributes(final TarArchiveInputStream 
stream) throws IOException {
+
+        final Properties props = new Properties();
+        props.loadFromXML(new NonCloseableInputStream(stream));
+
+        final Map<String, String> result = new HashMap<>();
+        for (final Entry<Object, Object> entry : props.entrySet()) {
+            final Object keyObject = entry.getKey();
+            final Object valueObject = entry.getValue();
+            if (!(keyObject instanceof String)) {
+                throw new IOException("Flow file attributes object contains 
key of type "
+                        + keyObject.getClass().getCanonicalName()
+                        + " but expected java.lang.String");
+            } else if (!(keyObject instanceof String)) {
+                throw new IOException("Flow file attributes object contains 
value of type "
+                        + keyObject.getClass().getCanonicalName()
+                        + " but expected java.lang.String");
+            }
+
+            final String key = (String) keyObject;
+            final String value = (String) valueObject;
+            result.put(key, value);
+        }
+
+        return result;
+    }
+
+    @Override
+    public boolean hasMoreData() throws IOException {
+        return flowFilesRead == 0;
+    }
+
+    public static final class NonCloseableInputStream extends InputStream {
+
+        final InputStream stream;
+
+        public NonCloseableInputStream(final InputStream stream) {
+            this.stream = stream;
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public int read() throws IOException {
+            return stream.read();
+        }
+
+        @Override
+        public int available() throws IOException {
+            return stream.available();
+        }
+
+        @Override
+        public synchronized void mark(int readlimit) {
+            stream.mark(readlimit);
+        }
+
+        @Override
+        public synchronized void reset() throws IOException {
+            stream.reset();
+        }
+
+        @Override
+        public boolean markSupported() {
+            return stream.markSupported();
+        }
+
+        @Override
+        public long skip(long n) throws IOException {
+            return stream.skip(n);
+        }
+
+        @Override
+        public int read(byte b[], int off, int len) throws IOException {
+            return stream.read(b, off, len);
+        }
+
+        @Override
+        public int read(byte b[]) throws IOException {
+            return stream.read(b);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java
 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java
new file mode 100644
index 0000000..500015f
--- /dev/null
+++ 
b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlowFileUnpackagerV2 implements FlowFileUnpackager {
+
+    private final byte readBuffer[] = new byte[8192];
+    private Map<String, String> nextAttributes = null;
+    private boolean haveReadSomething = false;
+
+    @Override
+    public boolean hasMoreData() throws IOException {
+        return nextAttributes != null || !haveReadSomething;
+    }
+
+    protected Map<String, String> readAttributes(final InputStream in) throws 
IOException {
+        final Map<String, String> attributes = new HashMap<>();
+        final Integer numAttributes = readFieldLength(in); //read number of 
attributes
+        if (numAttributes == null) {
+            return null;
+        }
+        if (numAttributes == 0) {
+            throw new IOException("flow files cannot have zero attributes");
+        }
+        for (int i = 0; i < numAttributes; i++) { //read each attribute 
key/value pair
+            final String key = readString(in);
+            final String value = readString(in);
+            attributes.put(key, value);
+        }
+
+        return attributes;
+    }
+
+    @Override
+    public Map<String, String> unpackageFlowFile(final InputStream in, final 
OutputStream out) throws IOException {
+        final Map<String, String> attributes;
+        if (nextAttributes != null) {
+            attributes = nextAttributes;
+        } else {
+            attributes = readAttributes(in);
+        }
+
+        final long expectedNumBytes = readLong(in); // read length of payload
+        copy(in, out, expectedNumBytes); // read payload
+
+        nextAttributes = readAttributes(in);
+        haveReadSomething = true;
+
+        return attributes;
+    }
+
+    protected String readString(final InputStream in) throws IOException {
+        final Integer numBytes = readFieldLength(in);
+        if (numBytes == null) {
+            throw new EOFException();
+        }
+        final byte[] bytes = new byte[numBytes];
+        fillBuffer(in, bytes, numBytes);
+        return new String(bytes, "UTF-8");
+    }
+
+    private void fillBuffer(final InputStream in, final byte[] buffer, final 
int length) throws IOException {
+        int bytesRead;
+        int totalBytesRead = 0;
+        while ((bytesRead = in.read(buffer, totalBytesRead, length - 
totalBytesRead)) > 0) {
+            totalBytesRead += bytesRead;
+        }
+        if (totalBytesRead != length) {
+            throw new EOFException();
+        }
+    }
+
+    protected long copy(final InputStream in, final OutputStream out, final 
long numBytes) throws IOException {
+        int bytesRead;
+        long totalBytesRead = 0L;
+        while ((bytesRead = in.read(readBuffer, 0, (int) 
Math.min(readBuffer.length, numBytes - totalBytesRead))) > 0) {
+            out.write(readBuffer, 0, bytesRead);
+            totalBytesRead += bytesRead;
+        }
+
+        if (totalBytesRead < numBytes) {
+            throw new EOFException("Expected " + numBytes + " but received " + 
totalBytesRead);
+        }
+
+        return totalBytesRead;
+    }
+
+    protected long readLong(final InputStream in) throws IOException {
+        fillBuffer(in, readBuffer, 8);
+        return (((long) readBuffer[0] << 56)
+                + ((long) (readBuffer[1] & 255) << 48)
+                + ((long) (readBuffer[2] & 255) << 40)
+                + ((long) (readBuffer[3] & 255) << 32)
+                + ((long) (readBuffer[4] & 255) << 24)
+                + ((readBuffer[5] & 255) << 16)
+                + ((readBuffer[6] & 255) << 8)
+                + ((readBuffer[7] & 255)));
+    }
+
+    private Integer readFieldLength(final InputStream in) throws IOException {
+        final int firstValue = in.read();
+        final int secondValue = in.read();
+        if (firstValue < 0) {
+            return null;
+        }
+        if (secondValue < 0) {
+            throw new EOFException();
+        }
+        if (firstValue == 0xff && secondValue == 0xff) {
+            int ch1 = in.read();
+            int ch2 = in.read();
+            int ch3 = in.read();
+            int ch4 = in.read();
+            if ((ch1 | ch2 | ch3 | ch4) < 0) {
+                throw new EOFException();
+            }
+            return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+        } else {
+            return ((firstValue << 8) + (secondValue));
+        }
+    }
+}

Reply via email to