[
https://issues.apache.org/jira/browse/BEAM-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339460#comment-16339460
]
ASF GitHub Bot commented on BEAM-3160:
--
iemejia closed pull request #4433: [BEAM-3160] Prevent issue where we would
choose which coder to use arbitrarily when it is over specified with multiple
coders.
URL: https://github.com/apache/beam/pull/4433
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
index bc2ef3fe2dd..76b1fb47081 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
@@ -82,7 +82,7 @@ public Throwable getRootCause() {
*/
public enum ReasonCode {
/**
- * The reason a coder could not be provided is unknown or does have an
established
+ * The reason a coder could not be provided is unknown or does not have an
established
* {@link ReasonCode}.
*/
UNKNOWN,
@@ -91,6 +91,12 @@ public Throwable getRootCause() {
* The reason a coder could not be provided is type erasure, for example
when requesting
* coder inference for a {@code List} where {@code T} is unknown.
*/
-TYPE_ERASURE
+TYPE_ERASURE,
+
+/**
+ * The reason a coder could not be provided is because the type variable
{@code T} is
+ * over specified with multiple incompatible coders.
+ */
+OVER_SPECIFIED
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index e4cb55acf18..b867d935c49 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -20,10 +20,13 @@
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSetMultimap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
@@ -246,7 +249,7 @@ public void registerCoderForType(TypeDescriptor type,
Coder coder) {
* @throws CannotProvideCoderException if a {@link Coder} cannot be provided
*/
public Coder getCoder(TypeDescriptor type) throws
CannotProvideCoderException {
-return getCoderFromTypeDescriptor(type, Collections.emptyMap());
+return getCoderFromTypeDescriptor(type, HashMultimap.create());
}
/**
@@ -433,7 +436,7 @@ public void registerCoderForType(TypeDescriptor type,
Coder coder) {
baseClass.getCanonicalName(), typeArgs.length,
knownCoders.length));
}
-Map context = new HashMap<>();
+SetMultimap context = HashMultimap.create();
for (int i = 0; i < knownCoders.length; i++) {
if (knownCoders[i] != null) {
try {
@@ -591,12 +594,23 @@ private static boolean isNullOrEmpty(Collection c) {
* @throws CannotProvideCoderException if a coder cannot be provided
*/
private Coder getCoderFromTypeDescriptor(
- TypeDescriptor typeDescriptor, Map typeCoderBindings)
+ TypeDescriptor typeDescriptor, SetMultimap
typeCoderBindings)
throws CannotProvideCoderException {
Type type = typeDescriptor.getType();
Coder coder;
if (typeCoderBindings.containsKey(type)) {
- coder = typeCoderBindings.get(type);
+ Set coders = typeCoderBindings.get(type);
+ if (coders.size() == 1) {
+coder = Iterables.getOnlyElement(coders);
+ } else {
+throw new CannotProvideCoderException(
+String.format("Cannot provide a coder for type variable %s"
++ " because the actual type is over specified by multiple"
++ " incompatible coders %s.",
+type,
+coders),
+ReasonCode.OVER_SPECIFIED);
+ }
} else if (type instanceof Class) {
coder = getCoderFromFactories(typeDescriptor, Collections.emptyList());
} else if (type instanceof ParameterizedType) {
@@