Add BigQuery Verifier to WindowedWordCountIT
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd46523d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd46523d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd46523d Branch: refs/heads/master Commit: dd46523dc2bca4aee11265a2fb065cc137920b1d Parents: 8e225d7 Author: Mark Liu <mark...@markliu-macbookpro.roam.corp.google.com> Authored: Thu Oct 6 14:34:55 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Wed Oct 19 16:22:57 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/examples/WindowedWordCountIT.java | 11 +++++++++++ .../beam/examples/cookbook/BigQueryTornadoesIT.java | 2 +- .../apache/beam/sdk/testing/BigqueryMatcher.java | 16 ++++++++++------ .../beam/sdk/testing/BigqueryMatcherTest.java | 2 +- 4 files changed, 23 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 379d1b0..6742654 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -19,8 +19,10 @@ package org.apache.beam.examples; import java.io.IOException; import org.apache.beam.examples.WindowedWordCount.Options; +import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.BigqueryMatcher; import org.apache.beam.sdk.testing.StreamingIT; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; @@ -40,6 +42,9 @@ public class WindowedWordCountIT { */ public interface WindowedWordCountITOptions extends Options, TestPipelineOptions, StreamingOptions { + @Default.String("ff54f6f42b2afeb146206c1e8e915deaee0362b4") + String getChecksum(); + void setChecksum(String value); } @Test @@ -59,6 +64,12 @@ public class WindowedWordCountIT { TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class); options.setStreaming(isStreaming); + String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word", + options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable()); + options.setOnSuccessMatcher( + new BigqueryMatcher( + options.getAppName(), options.getProject(), query, options.getChecksum())); + WindowedWordCount.main(TestPipeline.convertToArgs(options)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java index 7e15389..27a5a8f 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java @@ -39,7 +39,7 @@ public class BigQueryTornadoesIT { */ public interface BigQueryTornadoesITOptions extends TestPipelineOptions, BigQueryTornadoes.Options, BigQueryOptions { - @Default.String("043e8e6ee32384df0cda4c241b8ab897f2ce0f2f") + @Default.String("1ab4c7ec460b94bbb3c3885b178bf0e6bed56e1f") String getChecksum(); void setChecksum(String value); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java index 7646caa..95208ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java @@ -40,8 +40,10 @@ import com.google.common.hash.Hashing; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; +import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.util.FluentBackoff; @@ -182,14 +184,16 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult> return credential; } - private String generateHash(List<TableRow> rows) { + private String generateHash(@Nonnull List<TableRow> rows) { List<HashCode> rowHashes = Lists.newArrayList(); for (TableRow row : rows) { - List<HashCode> cellHashes = Lists.newArrayList(); + List<String> cellsInOneRow = Lists.newArrayList(); for (TableCell cell : row.getF()) { - cellHashes.add(Hashing.sha1().hashString(cell.toString(), StandardCharsets.UTF_8)); + cellsInOneRow.add(Objects.toString(cell.getV())); + Collections.sort(cellsInOneRow); } - rowHashes.add(Hashing.combineUnordered(cellHashes)); + rowHashes.add( + Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8)); } return Hashing.combineUnordered(rowHashes).toString(); } @@ -222,13 +226,13 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult> StringBuilder samples = new StringBuilder(); List<TableRow> rows = response.getRows(); for (int i = 0; i < totalNumRows && i < rows.size(); i++) { - samples.append("\n\t\t"); + samples.append(String.format("%n\t\t")); for (TableCell field : rows.get(i).getF()) { samples.append(String.format("%-10s", field.getV())); } } if (rows.size() > totalNumRows) { - samples.append("\n\t\t...."); + samples.append(String.format("%n\t\t...")); } return samples.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java index 1c427f8..d0ae765 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java @@ -74,7 +74,7 @@ public class BigqueryMatcherTest { public void testBigqueryMatcherThatSucceeds() throws Exception { BigqueryMatcher matcher = spy( new BigqueryMatcher( - appName, projectId, query, "8d1bbbf1f523f924b98c88b00c5811e041c2f855")); + appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc")); doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); when(mockQuery.execute()).thenReturn(createResponseContainingTestData());