This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 278504a2787 [FLINK-27882][tests][table] Migrate flink-scala to JUnit5
278504a2787 is described below

commit 278504a2787a154faf6f6401028d4bbadafbba0a
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Tue Nov 21 00:20:40 2023 +0100

    [FLINK-27882][tests][table] Migrate flink-scala to JUnit5
---
 flink-scala/pom.xml                                |   6 +
 .../scala/operators/ScalaCsvOutputFormatTest.java  |  47 +-
 .../misc/KryoSerializerRegistrationsTest.java      |  60 +--
 .../org.junit.jupiter.api.extension.Extension      |  16 +
 .../api/scala/DeltaIterationSanityCheckTest.scala  | 115 +++--
 .../apache/flink/api/scala/MaxByOperatorTest.scala |  78 +--
 .../apache/flink/api/scala/MinByOperatorTest.scala |  79 +--
 .../flink/api/scala/SelectByFunctionTest.scala     | 147 +++---
 .../scala/extensions/base/AcceptPFTestBase.scala   |   5 +-
 .../OnCoGroupDataSetTest.scala                     |   2 +-
 .../OnCrossDataSetTest.scala                       |   2 +-
 .../acceptPartialFunctions/OnDataSetTest.scala     |   2 +-
 .../OnGroupedDataSetTest.scala                     |   2 +-
 .../OnHalfUnfinishedKeyPairOperationTest.scala     |   2 +-
 .../OnJoinFunctionAssignerTest.scala               |   2 +-
 .../OnUnfinishedKeyPairOperationTest.scala         |   2 +-
 .../SemanticPropertiesTranslationTest.scala        |  87 ++--
 .../api/scala/io/CollectionInputFormatTest.scala   |  18 +-
 .../flink/api/scala/io/CsvInputFormatTest.scala    | 242 +++++-----
 .../flink/api/scala/metrics/ScalaGaugeTest.scala   |   6 +-
 .../scala/runtime/GenericPairComparatorTest.scala  |   1 -
 .../runtime/KryoGenericTypeSerializerTest.scala    |   2 +-
 .../api/scala/runtime/TupleSerializerTest.scala    |  26 +-
 .../tuple/base/PairComparatorTestBase.scala        |  24 +-
 .../tuple/base/TupleComparatorTestBase.scala       |   2 +-
 .../api/scala/types/TypeInformationGenTest.scala   | 535 ++++++++++-----------
 .../EnumValueSerializerCompatibilityTest.scala     |  53 +-
 .../scala/typeutils/EnumValueSerializerTest.scala  |  13 +-
 .../scala/typeutils/InstantiationUtilTest.scala    |  12 +-
 .../ScalaCaseClassSerializerReflectionTest.scala   |  26 +-
 .../api/scala/typeutils/TypeExtractionTest.scala   |  16 +-
 .../api/scala/typeutils/TypeInfoFactoryTest.scala  |  50 +-
 32 files changed, 813 insertions(+), 867 deletions(-)

diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index c4c801e089d..51969c91142 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -286,6 +286,12 @@ under the License.
                                                <goals>
                                                        <goal>test-jar</goal>
                                                </goals>
+                                               <configuration>
+                                                       <excludes>
+                                                               <!-- test-jar 
is still used by JUnit4 modules -->
+                                                               
<exclude>META-INF/services/org.junit.jupiter.api.extension.Extension</exclude>
+                                                       </excludes>
+                                               </configuration>
                                        </execution>
                                </executions>
                        </plugin>
diff --git 
a/flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java
 
b/flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java
index 4ae751e184d..00ede74adc1 100644
--- 
a/flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java
+++ 
b/flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java
@@ -22,11 +22,9 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -35,19 +33,20 @@ import java.util.List;
 
 import scala.Tuple3;
 
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link ScalaCsvOutputFormat}. */
-public class ScalaCsvOutputFormatTest {
+class ScalaCsvOutputFormatTest {
 
     private String path;
     private ScalaCsvOutputFormat<Tuple3<String, String, Integer>> 
csvOutputFormat;
 
-    @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder();
+    @TempDir private java.nio.file.Path tmpFolder;
 
-    @Before
-    public void setUp() throws Exception {
-        path = tmpFolder.newFile().getAbsolutePath();
+    @BeforeEach
+    void setUp() throws Exception {
+        path = tmpFolder.toFile().getAbsolutePath();
         csvOutputFormat = new ScalaCsvOutputFormat<>(new Path(path));
         csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
         
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
@@ -55,7 +54,7 @@ public class ScalaCsvOutputFormatTest {
     }
 
     @Test
-    public void testNullAllow() throws Exception {
+    void testNullAllow() throws Exception {
         try {
             csvOutputFormat.setAllowNullValues(true);
             csvOutputFormat.writeRecord(new Tuple3<>("One", null, 8));
@@ -63,22 +62,20 @@ public class ScalaCsvOutputFormatTest {
             csvOutputFormat.close();
         }
         java.nio.file.Path p = Paths.get(path);
-        Assert.assertTrue(Files.exists(p));
+        assertThat(p).exists();
         List<String> lines = Files.readAllLines(Paths.get(path), 
StandardCharsets.UTF_8);
-        Assert.assertEquals(1, lines.size());
-        Assert.assertEquals("One,,8", lines.get(0));
+        assertThat(lines).hasSize(1);
+        assertThat(lines.get(0)).isEqualTo("One,,8");
     }
 
     @Test
-    public void testNullDisallowOnDefault() throws Exception {
-        try {
-            csvOutputFormat.setAllowNullValues(false);
-            csvOutputFormat.writeRecord(new Tuple3<>("One", null, 8));
-            fail("should fail with an exception");
-        } catch (RuntimeException e) {
-            // expected
-        } finally {
-            csvOutputFormat.close();
-        }
+    void testNullDisallowOnDefault() throws Exception {
+        assertThatThrownBy(
+                        () -> {
+                            csvOutputFormat.setAllowNullValues(false);
+                            csvOutputFormat.writeRecord(new Tuple3<>("One", 
null, 8));
+                        })
+                .isInstanceOf(RuntimeException.class);
+        csvOutputFormat.close();
     }
 }
diff --git 
a/flink-scala/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
 
b/flink-scala/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
index d558eb4d21d..286029313c9 100644
--- 
a/flink-scala/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
+++ 
b/flink-scala/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
@@ -23,19 +23,13 @@ import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
 import java.io.InputStreamReader;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Tests that the set of Kryo registrations is the same across compatible 
Flink versions.
@@ -44,7 +38,7 @@ import static org.junit.Assert.fail;
  * Kryo serializer itself sits, because when runtime is present in the 
classpath, Chill is used to
  * instantiate Kryo and adds the proper set of registrations.
  */
-public class KryoSerializerRegistrationsTest {
+class KryoSerializerRegistrationsTest {
 
     /**
      * Tests that the registered classes in Kryo did not change.
@@ -53,7 +47,7 @@ public class KryoSerializerRegistrationsTest {
      * change in the serializers can break savepoint backwards compatibility 
between Flink versions.
      */
     @Test
-    public void testDefaultKryoRegisteredClassesDidNotChange() throws 
Exception {
+    void testDefaultKryoRegisteredClassesDidNotChange() throws Exception {
         final Kryo kryo = new KryoSerializer<>(Integer.class, new 
ExecutionConfig()).getKryo();
 
         try (BufferedReader reader =
@@ -78,10 +72,9 @@ public class KryoSerializerRegistrationsTest {
                     // only available if flink-avro is present. There is a 
special version of
                     // this test in AvroKryoSerializerRegistrationsTest that 
verifies correct
                     // registration of Avro types if present
-                    assertThat(
-                            registration.getType().getName(),
-                            is(
-                                    
"org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass"));
+                    assertThat(registration.getType().getName())
+                            .isEqualTo(
+                                    
"org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass");
                 } else if 
(!registeredClass.equals(registration.getType().getName())) {
                     fail(
                             String.format(
@@ -91,41 +84,4 @@ public class KryoSerializerRegistrationsTest {
             }
         }
     }
-
-    /**
-     * Creates a Kryo serializer and writes the default registrations out to a 
comma separated file
-     * with one entry per line:
-     *
-     * <pre>
-     * id,class
-     * </pre>
-     *
-     * <p>The produced file is used to check that the registered IDs don't 
change in future Flink
-     * versions.
-     *
-     * <p>This method is not used in the tests, but documents how the test 
file has been created and
-     * can be used to re-create it if needed.
-     *
-     * @param filePath File path to write registrations to
-     */
-    private void writeDefaultKryoRegistrations(String filePath) throws 
IOException {
-        final File file = new File(filePath);
-        if (file.exists()) {
-            assertTrue(file.delete());
-        }
-
-        final Kryo kryo = new KryoSerializer<>(Integer.class, new 
ExecutionConfig()).getKryo();
-        final int nextId = kryo.getNextRegistrationId();
-
-        try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) 
{
-            for (int i = 0; i < nextId; i++) {
-                Registration registration = kryo.getRegistration(i);
-                String str = registration.getId() + "," + 
registration.getType().getName();
-                writer.write(str, 0, str.length());
-                writer.newLine();
-            }
-
-            System.out.println("Created file with registrations at " + 
file.getAbsolutePath());
-        }
-    }
 }
diff --git 
a/flink-scala/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
 
b/flink-scala/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 00000000000..0b74fd4603c
--- /dev/null
+++ 
b/flink-scala/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
index 83732eaaef1..3555e3ec877 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -20,7 +20,8 @@ package org.apache.flink.api.scala
 import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThatThrownBy
+import org.junit.jupiter.api.Test
 
 // Verify that the sanity checking in delta iterations works. We just
 // have a dummy job that is not meant to be executed. Only verify that
@@ -57,49 +58,60 @@ class DeltaIterationSanityCheckTest extends Serializable {
     iteration.output(new DiscardingOutputFormat[(Int, String)])
   }
 
-  @Test(expected = classOf[InvalidProgramException])
+  @Test
   def testIncorrectJoinWithSolution1(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val solutionInput = env.fromElements((1, "1"))
     val worksetInput = env.fromElements((2, "2"))
 
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
-      (s, ws) =>
-        val result = s.join(ws).where("_2").equalTo("_2")((l, r) => l)
-        (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int, String)])
+    assertThatThrownBy(
+      () => {
+        val iteration = solutionInput.iterateDelta(worksetInput, 10, 
Array("_1")) {
+          (s, ws) =>
+            val result = s.join(ws).where("_2").equalTo("_2")((l, r) => l)
+            (result, ws)
+        }
+
+        iteration.output(new DiscardingOutputFormat[(Int, String)])
+      })
+      .isInstanceOf(classOf[InvalidProgramException])
   }
 
-  @Test(expected = classOf[InvalidProgramException])
+  @Test
   def testIncorrectJoinWithSolution2(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val solutionInput = env.fromElements((1, "1"))
     val worksetInput = env.fromElements((2, "2"))
 
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
-      (s, ws) =>
-        val result = ws.join(s).where("_2").equalTo("_2")((l, r) => l)
-        (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int, String)])
+    assertThatThrownBy(
+      () => {
+        val iteration = solutionInput.iterateDelta(worksetInput, 10, 
Array("_1")) {
+          (s, ws) =>
+            val result = ws.join(s).where("_2").equalTo("_2")((l, r) => l)
+            (result, ws)
+        }
+
+        iteration.output(new DiscardingOutputFormat[(Int, String)])
+      })
+      .isInstanceOf(classOf[InvalidProgramException])
   }
 
-  @Test(expected = classOf[InvalidProgramException])
+  @Test
   def testIncorrectJoinWithSolution3(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val solutionInput = env.fromElements((1, "1"))
     val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) {
-      (s, ws) =>
-        val result = ws.join(s).where("_1").equalTo("_1")((l, r) => l)
-        (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int, String)])
+    assertThatThrownBy(
+      () => {
+        val iteration = solutionInput.iterateDelta(worksetInput, 10, 
Array("_2")) {
+          (s, ws) =>
+            val result = ws.join(s).where("_1").equalTo("_1")((l, r) => l)
+            (result, ws)
+        }
+
+        iteration.output(new DiscardingOutputFormat[(Int, String)])
+      })
+      .isInstanceOf(classOf[InvalidProgramException])
   }
 
   @Test
@@ -132,48 +144,57 @@ class DeltaIterationSanityCheckTest extends Serializable {
     iteration.output(new DiscardingOutputFormat[(Int, String)])
   }
 
-  @Test(expected = classOf[InvalidProgramException])
+  @Test
   def testIncorrectCoGroupWithSolution1(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val solutionInput = env.fromElements((1, "1"))
     val worksetInput = env.fromElements((2, "2"))
 
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
-      (s, ws) =>
-        val result = s.coGroup(ws).where("_2").equalTo("_2")((l, r) => l.min)
-        (result, ws)
-    }
+    assertThatThrownBy(
+      () => {
+        val iteration = solutionInput.iterateDelta(worksetInput, 10, 
Array("_1")) {
+          (s, ws) =>
+            val result = s.coGroup(ws).where("_2").equalTo("_2")((l, r) => 
l.min)
+            (result, ws)
+        }
 
-    iteration.output(new DiscardingOutputFormat[(Int, String)])
+        iteration.output(new DiscardingOutputFormat[(Int, String)])
+      }).isInstanceOf(classOf[InvalidProgramException])
   }
 
-  @Test(expected = classOf[InvalidProgramException])
+  @Test
   def testIncorrectCoGroupWithSolution2(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val solutionInput = env.fromElements((1, "1"))
     val worksetInput = env.fromElements((2, "2"))
 
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) {
-      (s, ws) =>
-        val result = ws.coGroup(s).where("_2").equalTo("_2")((l, r) => l.min)
-        (result, ws)
-    }
+    assertThatThrownBy(
+      () => {
+        val iteration = solutionInput.iterateDelta(worksetInput, 10, 
Array("_1")) {
+          (s, ws) =>
+            val result = ws.coGroup(s).where("_2").equalTo("_2")((l, r) => 
l.min)
+            (result, ws)
+        }
 
-    iteration.output(new DiscardingOutputFormat[(Int, String)])
+        iteration.output(new DiscardingOutputFormat[(Int, String)])
+      }).isInstanceOf(classOf[InvalidProgramException])
   }
 
-  @Test(expected = classOf[InvalidProgramException])
+  @Test
   def testIncorrectCoGroupWithSolution3(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val solutionInput = env.fromElements((1, "1"))
     val worksetInput = env.fromElements((2, "2"))
 
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) {
-      (s, ws) =>
-        val result = ws.coGroup(s).where("_1").equalTo("_1")((l, r) => l.min)
-        (result, ws)
-    }
+    assertThatThrownBy(
+      () => {
+        val iteration = solutionInput.iterateDelta(worksetInput, 10, 
Array("_2")) {
+          (s, ws) =>
+            val result = ws.coGroup(s).where("_1").equalTo("_1")((l, r) => 
l.min)
+            (result, ws)
+        }
 
-    iteration.output(new DiscardingOutputFormat[(Int, String)])
+        iteration.output(new DiscardingOutputFormat[(Int, String)])
+      }).isInstanceOf(classOf[InvalidProgramException])
   }
 }
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/MaxByOperatorTest.scala 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/MaxByOperatorTest.scala
index fd0218fe441..3b70acdfe00 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/MaxByOperatorTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/MaxByOperatorTest.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.InvalidProgramException
 
-import org.junit.Assert
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThatThrownBy
+import org.junit.jupiter.api.Test
 
 class MaxByOperatorTest {
 
@@ -31,122 +31,124 @@ class MaxByOperatorTest {
   def testMaxByKeyFieldsDataset(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val collection = env.fromCollection(emptyTupleData)
-    try {
-      collection.maxBy(0, 1, 2, 3, 4)
-    } catch {
-      case e: Exception => Assert.fail();
-    }
+    collection.maxBy(0, 1, 2, 3, 4)
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset1() {
+  @Test
+  def testOutOfTupleBoundsDataset1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val collection = env.fromCollection(emptyTupleData)
 
     // should not work, key out of tuple bounds
-    collection.maxBy(5)
+    assertThatThrownBy(() => 
collection.maxBy(5)).isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset2() {
+  @Test
+  def testOutOfTupleBoundsDataset2(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val collection = env.fromCollection(emptyTupleData)
 
     // should not work, key out of tuple bounds
-    collection.maxBy(-1)
+    assertThatThrownBy(() => collection.maxBy(-1))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset3() {
+  @Test
+  def testOutOfTupleBoundsDataset3(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val collection = env.fromCollection(emptyTupleData)
 
     // should not work, key out of tuple bounds
-    collection.maxBy(1, 2, 3, 4, -1)
+    assertThatThrownBy(() => collection.maxBy(1, 2, 3, 4, -1))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   /** This test validates that no exceptions is thrown when an empty grouping 
calls maxBy(). */
   @Test
-  def testMaxByKeyFieldsGrouping() {
+  def testMaxByKeyFieldsGrouping(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
     // should work
-    try {
-      groupDs.maxBy(4, 0, 1, 2, 3)
-    } catch {
-      case e: Exception => Assert.fail();
-    }
+    groupDs.maxBy(4, 0, 1, 2, 3)
   }
 
   /**
    * This test validates that an InvalidProgramException is thrown when maxBy 
is used on a custom
    * data type.
    */
-  @Test(expected = classOf[InvalidProgramException])
-  def testCustomKeyFieldsDataset() {
+  @Test
+  def testCustomKeyFieldsDataset(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val customDS = env.fromCollection(customTypeData)
     // should not work: groups on custom type
-    customDS.maxBy(0)
+    assertThatThrownBy(() => customDS.maxBy(0))
+      .isInstanceOf(classOf[InvalidProgramException])
   }
 
   /**
    * This test validates that an InvalidProgramException is thrown when maxBy 
is used on a custom
    * data type.
    */
-  @Test(expected = classOf[InvalidProgramException])
-  def testCustomKeyFieldsGrouping() {
+  @Test
+  def testCustomKeyFieldsGrouping(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs: GroupedDataSet[CustomType] = 
env.fromCollection(customTypeData).groupBy(0)
+    assertThatThrownBy(
+      () => {
+        val groupDs: GroupedDataSet[CustomType] = 
env.fromCollection(customTypeData).groupBy(0)
 
-    groupDs.maxBy(0)
+        groupDs.maxBy(0)
+      })
+      .isInstanceOf(classOf[InvalidProgramException])
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping1() {
+  @Test
+  def testOutOfTupleBoundsGrouping1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-    groupDs.maxBy(5)
+    assertThatThrownBy(() => groupDs.maxBy(5))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping2() {
+  @Test
+  def testOutOfTupleBoundsGrouping2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-    groupDs.maxBy(-1)
+    assertThatThrownBy(() => groupDs.maxBy(-1))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping3() {
+  @Test
+  def testOutOfTupleBoundsGrouping3(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-    groupDs.maxBy(1, 2, 3, 4, -1)
+    assertThatThrownBy(() => groupDs.maxBy(1, 2, 3, 4, -1))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   class CustomType(var myInt: Int, var myLong: Long, var myString: String) {
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/MinByOperatorTest.scala 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/MinByOperatorTest.scala
index ac1bb30a76f..f0e8a2201f8 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/MinByOperatorTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/MinByOperatorTest.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.InvalidProgramException
 
-import org.junit.Assert
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThatThrownBy
+import org.junit.jupiter.api.Test
 
 class MinByOperatorTest {
   private val emptyTupleData = List[(Int, Long, String, Long, Int)]()
@@ -30,125 +30,128 @@ class MinByOperatorTest {
   def testMinByKeyFieldsDataset(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val collection = env.fromCollection(emptyTupleData)
-    try {
-      collection.minBy(4, 0, 1, 2, 3)
-    } catch {
-      case e: Exception => Assert.fail();
-    }
+    collection.minBy(4, 0, 1, 2, 3)
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset1() {
+  @Test
+  def testOutOfTupleBoundsDataset1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val collection = env.fromCollection(emptyTupleData)
 
     // should not work, key out of tuple bounds
-    collection.minBy(5)
+    assertThatThrownBy(() => collection.minBy(5))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset2() {
+  @Test
+  def testOutOfTupleBoundsDataset2(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val collection = env.fromCollection(emptyTupleData)
 
     // should not work, key out of tuple bounds
-    collection.minBy(-1)
+    assertThatThrownBy(() => collection.minBy(-1))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset3() {
+  @Test
+  def testOutOfTupleBoundsDataset3(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val collection = env.fromCollection(emptyTupleData)
 
     // should not work, key out of tuple bounds
-    collection.minBy(1, 2, 3, 4, -1)
+    assertThatThrownBy(() => collection.minBy(1, 2, 3, 4, -1))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   /**
    * This test validates that an InvalidProgramException is thrown when minBy 
is used on a custom
    * data type.
    */
-  @Test(expected = classOf[InvalidProgramException])
-  def testCustomKeyFieldsDataset() {
+  @Test
+  def testCustomKeyFieldsDataset(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val customDS = env.fromCollection(customTypeData)
     // should not work: groups on custom type
-    customDS.minBy(0)
+    assertThatThrownBy(() => customDS.minBy(0))
+      .isInstanceOf(classOf[InvalidProgramException])
   }
 
   /** This test validates that no exceptions is thrown when an empty grouping 
calls minBy(). */
   @Test
-  def testMinByKeyFieldsGrouping() {
+  def testMinByKeyFieldsGrouping(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
     // should work
-    try {
-      groupDs.minBy(4, 0, 1, 2, 3)
-    } catch {
-      case e: Exception => Assert.fail()
-    }
+    groupDs.minBy(4, 0, 1, 2, 3)
   }
 
   /**
    * This test validates that an InvalidProgramException is thrown when minBy 
is used on a custom
    * data type.
    */
-  @Test(expected = classOf[InvalidProgramException])
-  def testCustomKeyFieldsGrouping() {
+  @Test
+  def testCustomKeyFieldsGrouping(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs: GroupedDataSet[CustomType] = 
env.fromCollection(customTypeData).groupBy(0)
+    assertThatThrownBy(
+      () => {
+        val groupDs: GroupedDataSet[CustomType] = 
env.fromCollection(customTypeData).groupBy(0)
 
-    groupDs.minBy(0)
+        groupDs.minBy(0)
+      })
+      .isInstanceOf(classOf[InvalidProgramException])
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping1() {
+  @Test
+  def testOutOfTupleBoundsGrouping1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
 
-    groupDs.minBy(5)
+    assertThatThrownBy(() => groupDs.minBy(5))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   /**
    * This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping2() {
+  @Test
+  def testOutOfTupleBoundsGrouping2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
 
-    groupDs.minBy(-1)
+    assertThatThrownBy(() => groupDs.minBy(-1))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   /**
    * s This test validates that an index which is out of bounds throws an 
IndexOutOfBoundsException.
    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping3() {
+  @Test
+  def testOutOfTupleBoundsGrouping3(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
 
-    groupDs.minBy(1, 2, 3, 4, -1)
+    assertThatThrownBy(() => groupDs.minBy(1, 2, 3, 4, -1))
+      .isInstanceOf(classOf[IndexOutOfBoundsException])
   }
 
   class CustomType(var myInt: Int, var myLong: Long, var myString: String) {
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/SelectByFunctionTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/SelectByFunctionTest.scala
index 81543f7084e..7da92106733 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/SelectByFunctionTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/SelectByFunctionTest.scala
@@ -20,7 +20,8 @@ package org.apache.flink.api.scala
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 
-import org.junit.{Assert, Test}
+import org.assertj.core.api.Assertions.{assertThat, fail}
+import org.junit.jupiter.api.Test
 
 class SelectByFunctionTest {
 
@@ -44,17 +45,15 @@ class SelectByFunctionTest {
     val a1 = Array(0)
     val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
     try {
-      Assert.assertSame(
-        "SelectByMax must return bigger tuple",
-        bigger,
-        maxByTuple.reduce(smaller, bigger))
-      Assert.assertSame(
-        "SelectByMax must return bigger tuple",
-        bigger,
-        maxByTuple.reduce(bigger, smaller))
+      assertThat(bigger)
+        .as("SelectByMax must return bigger tuple")
+        .isSameAs(maxByTuple.reduce(smaller, bigger))
+      assertThat(bigger)
+        .as("SelectByMax must return bigger tuple")
+        .isSameAs(maxByTuple.reduce(bigger, smaller))
     } catch {
       case e: Exception =>
-        Assert.fail("No exception should be thrown while comparing both 
tuples")
+        fail("No exception should be thrown while comparing both tuples")
     }
   }
 
@@ -71,17 +70,15 @@ class SelectByFunctionTest {
     val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
 
     try {
-      Assert.assertSame(
-        "SelectByMax must return the first given tuple",
-        specialCaseBigger,
-        maxByTuple.reduce(specialCaseBigger, bigger))
-      Assert.assertSame(
-        "SelectByMax must return the first given tuple",
-        bigger,
-        maxByTuple.reduce(bigger, specialCaseBigger))
+      assertThat(specialCaseBigger)
+        .as("SelectByMax must return the first given tuple")
+        .isSameAs(maxByTuple.reduce(specialCaseBigger, bigger))
+      assertThat(bigger)
+        .as("SelectByMax must return the first given tuple")
+        .isSameAs(maxByTuple.reduce(bigger, specialCaseBigger))
     } catch {
       case e: Exception =>
-        Assert.fail(
+        fail(
           "No exception should be thrown " +
             "while comparing both tuples")
     }
@@ -93,17 +90,15 @@ class SelectByFunctionTest {
     val a1 = Array(0, 2, 1, 4, 3)
     val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
     try {
-      Assert.assertSame(
-        "SelectByMax must return bigger tuple",
-        bigger,
-        maxByTuple.reduce(specialCaseBigger, bigger))
-      Assert.assertSame(
-        "SelectByMax must return bigger tuple",
-        bigger,
-        maxByTuple.reduce(bigger, specialCaseBigger))
+      assertThat(bigger)
+        .as("SelectByMax must return bigger tuple")
+        .isSameAs(maxByTuple.reduce(specialCaseBigger, bigger))
+      assertThat(bigger)
+        .as("SelectByMax must return bigger tuple")
+        .isSameAs(maxByTuple.reduce(bigger, specialCaseBigger))
     } catch {
       case e: Exception =>
-        Assert.fail(
+        fail(
           "No exception should be thrown" +
             " while comparing both tuples")
     }
@@ -115,17 +110,15 @@ class SelectByFunctionTest {
     val a1 = Array(0, 1, 2, 3, 4)
     val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
     try {
-      Assert.assertSame(
-        "SelectByMax must return bigger tuple",
-        bigger,
-        maxByTuple.reduce(smaller, bigger))
-      Assert.assertSame(
-        "SelectByMax must return bigger tuple",
-        bigger,
-        maxByTuple.reduce(bigger, smaller))
+      assertThat(bigger)
+        .as("SelectByMax must return bigger tuple")
+        .isSameAs(maxByTuple.reduce(smaller, bigger))
+      assertThat(bigger)
+        .as("SelectByMax must return bigger tuple")
+        .isSameAs(maxByTuple.reduce(bigger, smaller))
     } catch {
       case e: Exception =>
-        Assert.fail(
+        fail(
           "No exception should be thrown " +
             "while comparing both tuples")
     }
@@ -138,17 +131,15 @@ class SelectByFunctionTest {
     val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
 
     try {
-      Assert.assertSame(
-        "SelectByMax must return bigger tuple",
-        bigger,
-        maxByTuple.reduce(bigger, bigger))
-      Assert.assertSame(
-        "SelectByMax must return smaller tuple",
-        smaller,
-        maxByTuple.reduce(smaller, smaller))
+      assertThat(bigger)
+        .as("SelectByMax must return bigger tuple")
+        .isSameAs(maxByTuple.reduce(bigger, bigger))
+      assertThat(smaller)
+        .as("SelectByMax must return smaller tuple")
+        .isSameAs(maxByTuple.reduce(smaller, smaller))
     } catch {
       case e: Exception =>
-        Assert.fail(
+        fail(
           "No exception should be thrown" +
             " while comparing both tuples")
     }
@@ -165,17 +156,15 @@ class SelectByFunctionTest {
     val a1 = Array(0)
     val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
     try {
-      Assert.assertSame(
-        "SelectByMin must return smaller tuple",
-        smaller,
-        minByTuple.reduce(smaller, bigger))
-      Assert.assertSame(
-        "SelectByMin must return smaller tuple",
-        smaller,
-        minByTuple.reduce(bigger, smaller))
+      assertThat(smaller)
+        .as("SelectByMin must return smaller tuple")
+        .isSameAs(minByTuple.reduce(smaller, bigger))
+      assertThat(smaller)
+        .as("SelectByMin must return smaller tuple")
+        .isSameAs(minByTuple.reduce(bigger, smaller))
     } catch {
       case e: Exception =>
-        Assert.fail(
+        fail(
           "No exception should be thrown " +
             "while comparing both tuples")
     }
@@ -192,17 +181,15 @@ class SelectByFunctionTest {
     val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
 
     try {
-      Assert.assertSame(
-        "SelectByMin must return the first given tuple",
-        specialCaseBigger,
-        minByTuple.reduce(specialCaseBigger, bigger))
-      Assert.assertSame(
-        "SelectByMin must return the first given tuple",
-        bigger,
-        minByTuple.reduce(bigger, specialCaseBigger))
+      assertThat(specialCaseBigger)
+        .as("SelectByMin must return the first given tuple")
+        .isSameAs(minByTuple.reduce(specialCaseBigger, bigger))
+      assertThat(bigger)
+        .as("SelectByMin must return the first given tuple")
+        .isSameAs(minByTuple.reduce(bigger, specialCaseBigger))
     } catch {
       case e: Exception =>
-        Assert.fail(
+        fail(
           "No exception should be thrown " +
             "while comparing both tuples")
     }
@@ -218,17 +205,15 @@ class SelectByFunctionTest {
     val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
 
     try {
-      Assert.assertSame(
-        "SelectByMin must return smaller tuple",
-        smaller,
-        minByTuple.reduce(specialCaseSmaller, smaller))
-      Assert.assertSame(
-        "SelectByMin must return smaller tuple",
-        smaller,
-        minByTuple.reduce(smaller, specialCaseSmaller))
+      assertThat(smaller)
+        .as("SelectByMin must return smaller tuple")
+        .isSameAs(minByTuple.reduce(specialCaseSmaller, smaller))
+      assertThat(smaller)
+        .as("SelectByMin must return smaller tuple")
+        .isSameAs(minByTuple.reduce(smaller, specialCaseSmaller))
     } catch {
       case e: Exception =>
-        Assert.fail(
+        fail(
           "No exception should be thrown" +
             " while comparing both tuples")
     }
@@ -240,17 +225,15 @@ class SelectByFunctionTest {
     val a1 = Array(0, 1, 2, 3, 4)
     val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
     try {
-      Assert.assertSame(
-        "SelectByMin must return smaller tuple",
-        smaller,
-        minByTuple.reduce(smaller, bigger))
-      Assert.assertSame(
-        "SelectByMin must return smaller tuple",
-        smaller,
-        minByTuple.reduce(bigger, smaller))
+      assertThat(smaller)
+        .as("SelectByMin must return smaller tuple")
+        .isSameAs(minByTuple.reduce(smaller, bigger))
+      assertThat(smaller)
+        .as("SelectByMin must return smaller tuple")
+        .isSameAs(minByTuple.reduce(bigger, smaller))
     } catch {
       case e: Exception =>
-        Assert.fail(
+        fail(
           "No exception should be thrown" +
             " while comparing both tuples")
     }
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala
index bf843c55dc4..ca1f916dc97 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala
@@ -19,12 +19,9 @@ package org.apache.flink.api.scala.extensions.base
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.extensions.data.KeyValuePair
-import org.apache.flink.util.TestLogger
-
-import org.scalatest.junit.JUnitSuiteLike
 
 /** Common facilities to test the `acceptPartialFunctions` extension */
-abstract private[extensions] class AcceptPFTestBase extends TestLogger with 
JUnitSuiteLike {
+abstract private[extensions] class AcceptPFTestBase {
 
   private val env = ExecutionEnvironment.getExecutionEnvironment
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala
index de8faadb592..d6eeadd557f 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.api.scala.extensions.acceptPartialFunctions
 import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
 import org.apache.flink.api.scala.extensions.data.KeyValuePair
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class OnCoGroupDataSetTest extends AcceptPFTestBase {
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala
index 27522581cf0..28dd4140526 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.api.scala.extensions.acceptPartialFunctions
 import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
 import org.apache.flink.api.scala.extensions.data.KeyValuePair
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class OnCrossDataSetTest extends AcceptPFTestBase {
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala
index 8aed838d8db..428abfa07a8 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.api.scala.extensions.acceptPartialFunctions
 import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
 import org.apache.flink.api.scala.extensions.data.KeyValuePair
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class OnDataSetTest extends AcceptPFTestBase {
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala
index 1a8bedda3bc..d5df6dea5a3 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.scala.extensions.acceptPartialFunctions
 import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
 import org.apache.flink.api.scala.extensions.data.KeyValuePair
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class OnGroupedDataSetTest extends AcceptPFTestBase {
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala
index 3c1ffbe3707..bb137bd5126 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.scala.extensions.acceptPartialFunctions
 import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
 import org.apache.flink.api.scala.extensions.data.KeyValuePair
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class OnHalfUnfinishedKeyPairOperationTest extends AcceptPFTestBase {
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala
index 14239cd5ebe..222c1cf98ab 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.api.scala.extensions.acceptPartialFunctions
 import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
 import org.apache.flink.api.scala.extensions.data.KeyValuePair
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class OnJoinFunctionAssignerTest extends AcceptPFTestBase {
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala
index 30062ab9f73..42cbf0aabd4 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala
@@ -22,7 +22,7 @@ import 
org.apache.flink.api.scala.extensions.acceptPartialFunctions
 import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
 import org.apache.flink.api.scala.extensions.data.KeyValuePair
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class OnUnfinishedKeyPairOperationTest extends AcceptPFTestBase {
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
index ff4f3b4182c..e048a13618b 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
@@ -28,8 +28,8 @@ import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSec
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.scala._
 
-import org.junit.Assert._
-import org.junit.Test
+import org.assertj.core.api.Assertions.{assertThat, fail}
+import org.junit.jupiter.api.Test
 
 /**
  * This is a minimal test to verify that semantic annotations are evaluated 
against the type
@@ -61,18 +61,17 @@ class SemanticPropertiesTranslationTest {
       val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
       val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
 
-      assertNotNull(fw1)
-      assertNotNull(fw2)
-      assertNotNull(fw3)
-      assertTrue(fw1.contains(0))
-      assertTrue(fw2.contains(1))
-      assertTrue(fw3.contains(2))
+      assertThat(fw1).isNotNull
+      assertThat(fw2).isNotNull
+      assertThat(fw3).isNotNull
+      assertThat(fw1).contains(0)
+      assertThat(fw2).contains(1)
+      assertThat(fw3).contains(2)
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         System.err.println(e.getMessage)
         e.printStackTrace()
         fail("Exception in test: " + e.getMessage)
-      }
     }
   }
 
@@ -98,18 +97,17 @@ class SemanticPropertiesTranslationTest {
       val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
       val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
 
-      assertNotNull(fw1)
-      assertNotNull(fw2)
-      assertNotNull(fw3)
-      assertTrue(fw1.contains(0))
-      assertTrue(fw2.contains(1))
-      assertTrue(fw3.contains(2))
+      assertThat(fw1).isNotNull
+      assertThat(fw2).isNotNull
+      assertThat(fw3).isNotNull
+      assertThat(fw1).contains(0)
+      assertThat(fw2).contains(1)
+      assertThat(fw3).contains(2)
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         System.err.println(e.getMessage)
         e.printStackTrace()
         fail("Exception in test: " + e.getMessage)
-      }
     }
   }
 
@@ -135,18 +133,17 @@ class SemanticPropertiesTranslationTest {
       val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
       val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
 
-      assertNotNull(fw1)
-      assertNotNull(fw2)
-      assertNotNull(fw3)
-      assertTrue(fw1.size == 0)
-      assertTrue(fw3.size == 0)
-      assertTrue(fw2.contains(1))
+      assertThat(fw1).isNotNull
+      assertThat(fw2).isNotNull
+      assertThat(fw3).isNotNull
+      assertThat(fw1).isEmpty()
+      assertThat(fw3).isEmpty()
+      assertThat(fw2).contains(1)
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         System.err.println(e.getMessage)
         e.printStackTrace()
         fail("Exception in test: " + e.getMessage)
-      }
     }
   }
 
@@ -177,20 +174,19 @@ class SemanticPropertiesTranslationTest {
       val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0)
       val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1)
 
-      assertNotNull(fw11)
-      assertNotNull(fw21)
-      assertNotNull(fw12)
-      assertNotNull(fw22)
-      assertEquals(0, fw11.size)
-      assertEquals(0, fw22.size)
-      assertTrue(fw12.contains(0))
-      assertTrue(fw21.contains(1))
+      assertThat(fw11).isNotNull
+      assertThat(fw21).isNotNull
+      assertThat(fw12).isNotNull
+      assertThat(fw22).isNotNull
+      assertThat(fw11).isEmpty()
+      assertThat(fw22).isEmpty()
+      assertThat(fw12).contains(0)
+      assertThat(fw21).contains(1)
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         System.err.println(e.getMessage)
         e.printStackTrace()
         fail("Exception in test: " + e.getMessage)
-      }
     }
   }
 
@@ -221,20 +217,19 @@ class SemanticPropertiesTranslationTest {
       val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0)
       val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1)
 
-      assertNotNull(fw11)
-      assertNotNull(fw12)
-      assertNotNull(fw21)
-      assertNotNull(fw22)
-      assertTrue(fw11.contains(0))
-      assertTrue(fw12.contains(1))
-      assertTrue(fw21.contains(2))
-      assertTrue(fw22.contains(3))
+      assertThat(fw11).isNotNull
+      assertThat(fw12).isNotNull
+      assertThat(fw21).isNotNull
+      assertThat(fw22).isNotNull
+      assertThat(fw11).contains(0)
+      assertThat(fw12).contains(1)
+      assertThat(fw21).contains(2)
+      assertThat(fw22).contains(3)
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         System.err.println(e.getMessage)
         e.printStackTrace()
         fail("Exception in test: " + e.getMessage)
-      }
     }
   }
 }
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
index 6699a787746..95a2b326f36 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
@@ -23,10 +23,8 @@ import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.api.scala._
 import org.apache.flink.core.io.GenericInputSplit
 
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertNotNull
-import org.junit.Assert.assertTrue
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
 
 import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
@@ -72,8 +70,8 @@ class CollectionInputFormatTest {
     val in = new ObjectInputStream(new 
ByteArrayInputStream(buffer.toByteArray))
     val serializationResult: AnyRef = in.readObject
 
-    assertNotNull(serializationResult)
-    assertTrue(serializationResult.isInstanceOf[CollectionInputFormat[_]])
+    assertThat(serializationResult).isNotNull
+    
assertThat(serializationResult).isInstanceOf(classOf[CollectionInputFormat[_]])
 
     val result = 
serializationResult.asInstanceOf[CollectionInputFormat[ElementType]]
     val inputSplit = new GenericInputSplit(0, 1)
@@ -83,7 +81,7 @@ class CollectionInputFormatTest {
     while (!inputFormat.reachedEnd && !result.reachedEnd) {
       val expectedElement = inputFormat.nextRecord(null)
       val actualElement = result.nextRecord(null)
-      assertEquals(expectedElement, actualElement)
+      assertThat(expectedElement).isEqualTo(actualElement)
     }
   }
 
@@ -148,15 +146,15 @@ class CollectionInputFormatTest {
     val ois = new ObjectInputStream(bais)
     val result: AnyRef = ois.readObject
 
-    assertTrue(result.isInstanceOf[CollectionInputFormat[_]])
+    assertThat(result).isInstanceOf(classOf[CollectionInputFormat[_]])
     var i: Int = 0
     val in = result.asInstanceOf[CollectionInputFormat[String]]
     in.open(new GenericInputSplit(0, 1))
 
     while (!in.reachedEnd) {
-      assertEquals(data(i), in.nextRecord(""))
+      assertThat(data(i)).isEqualTo(in.nextRecord(""))
       i += 1
     }
-    assertEquals(data.length, i)
+    assertThat(data.length).isEqualTo(i)
   }
 }
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 754ec1f9b81..e7547bf420e 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -26,8 +26,8 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.{FileInputSplit, Path}
 
-import org.junit.Assert.{assertEquals, assertNotNull, assertNull, assertTrue, 
fail}
-import org.junit.Test
+import org.assertj.core.api.Assertions.{assertThat, fail, offset}
+import org.junit.jupiter.api.Test
 
 import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter}
 
@@ -60,23 +60,22 @@ class CsvInputFormatTest {
       format.open(split)
       var result: (String, Integer, Double) = null
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("this is", result._1)
-      assertEquals(new Integer(1), result._2)
-      assertEquals(2.0, result._3, 0.0001)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo("this is")
+      assertThat(result._2).isEqualTo(new Integer(1))
+      assertThat(result._3).isEqualTo(2.0, offset(0.0001))
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("a test", result._1)
-      assertEquals(new Integer(3), result._2)
-      assertEquals(4.0, result._3, 0.0001)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo("a test")
+      assertThat(result._2).isEqualTo(new Integer(3))
+      assertThat(result._3).isEqualTo(4.0, offset(0.0001))
       result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
+      assertThat(result).isNull()
+      assertThat(format.reachedEnd).isTrue
     } catch {
-      case ex: Exception => {
-        ex.printStackTrace
+      case ex: Exception =>
+        ex.printStackTrace()
         fail("Test failed due to a " + ex.getClass.getName + ": " + 
ex.getMessage)
-      }
     }
   }
 
@@ -101,23 +100,22 @@ class CsvInputFormatTest {
       format.open(split)
       var result: (String, Integer, Double) = null
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("this is", result._1)
-      assertEquals(new Integer(1), result._2)
-      assertEquals(2.0, result._3, 0.0001)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo("this is")
+      assertThat(result._2).isEqualTo(new Integer(1))
+      assertThat(result._3).isEqualTo(2.0, offset(0.0001))
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("a test", result._1)
-      assertEquals(new Integer(3), result._2)
-      assertEquals(4.0, result._3, 0.0001)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo("a test")
+      assertThat(result._2).isEqualTo(new Integer(3))
+      assertThat(result._3).isEqualTo(4.0, offset(0.0001))
       result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
+      assertThat(result).isNull()
+      assertThat(format.reachedEnd).isTrue
     } catch {
-      case ex: Exception => {
-        ex.printStackTrace
+      case ex: Exception =>
+        ex.printStackTrace()
         fail("Test failed due to a " + ex.getClass.getName + ": " + 
ex.getMessage)
-      }
     }
   }
 
@@ -137,28 +135,27 @@ class CsvInputFormatTest {
       format.open(split)
       var result: (String, String, String) = null
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("def", result._2)
-      assertEquals("ghijk", result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo("abc")
+      assertThat(result._2).isEqualTo("def")
+      assertThat(result._3).isEqualTo("ghijk")
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("", result._2)
-      assertEquals("hhg", result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo("abc")
+      assertThat(result._2).isEmpty()
+      assertThat(result._3).isEqualTo("hhg")
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("", result._1)
-      assertEquals("", result._2)
-      assertEquals("", result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEmpty()
+      assertThat(result._2).isEmpty()
+      assertThat(result._3).isEmpty()
       result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
+      assertThat(result).isNull()
+      assertThat(format.reachedEnd).isTrue
     } catch {
-      case ex: Exception => {
+      case ex: Exception =>
         ex.printStackTrace()
         fail("Test failed due to a " + ex.getClass.getName + ": " + 
ex.getMessage)
-      }
     }
   }
 
@@ -179,28 +176,27 @@ class CsvInputFormatTest {
       format.open(split)
       var result: (String, String, String) = null
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("de|f", result._2)
-      assertEquals("ghijk", result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo("abc")
+      assertThat(result._2).isEqualTo("de|f")
+      assertThat(result._3).isEqualTo("ghijk")
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("a|bc", result._1)
-      assertEquals("", result._2)
-      assertEquals("hhg", result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo("a|bc")
+      assertThat(result._2).isEmpty()
+      assertThat(result._3).isEqualTo("hhg")
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("", result._1)
-      assertEquals("", result._2)
-      assertEquals("", result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEmpty()
+      assertThat(result._2).isEmpty()
+      assertThat(result._3).isEmpty()
       result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
+      assertThat(result).isNull()
+      assertThat(format.reachedEnd).isTrue
     } catch {
-      case ex: Exception => {
+      case ex: Exception =>
         ex.printStackTrace()
         fail("Test failed due to a " + ex.getClass.getName + ": " + 
ex.getMessage)
-      }
     }
   }
 
@@ -220,23 +216,23 @@ class CsvInputFormatTest {
       format.open(split)
       var result: (String, String, String) = null
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("def", result._2)
-      assertEquals("ghijk", result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo("abc")
+      assertThat(result._2).isEqualTo("def")
+      assertThat(result._3).isEqualTo("ghijk")
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("", result._2)
-      assertEquals("hhg", result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo("abc")
+      assertThat(result._2).isEmpty()
+      assertThat(result._3).isEqualTo("hhg")
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("", result._1)
-      assertEquals("", result._2)
-      assertEquals("", result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEmpty()
+      assertThat(result._2).isEmpty()
+      assertThat(result._3).isEmpty()
       result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
+      assertThat(result).isNull()
+      assertThat(format.reachedEnd).isTrue
     } catch {
       case ex: Exception =>
         ex.printStackTrace()
@@ -258,22 +254,22 @@ class CsvInputFormatTest {
       format.open(split)
       var result: (Int, Int, Int, Int, Int) = null
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(111), result._1)
-      assertEquals(Integer.valueOf(222), result._2)
-      assertEquals(Integer.valueOf(333), result._3)
-      assertEquals(Integer.valueOf(444), result._4)
-      assertEquals(Integer.valueOf(555), result._5)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo(Integer.valueOf(111))
+      assertThat(result._2).isEqualTo(Integer.valueOf(222))
+      assertThat(result._3).isEqualTo(Integer.valueOf(333))
+      assertThat(result._4).isEqualTo(Integer.valueOf(444))
+      assertThat(result._5).isEqualTo(Integer.valueOf(555))
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(666), result._1)
-      assertEquals(Integer.valueOf(777), result._2)
-      assertEquals(Integer.valueOf(888), result._3)
-      assertEquals(Integer.valueOf(999), result._4)
-      assertEquals(Integer.valueOf(0), result._5)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo(Integer.valueOf(666))
+      assertThat(result._2).isEqualTo(Integer.valueOf(777))
+      assertThat(result._3).isEqualTo(Integer.valueOf(888))
+      assertThat(result._4).isEqualTo(Integer.valueOf(999))
+      assertThat(result._5).isEqualTo(Integer.valueOf(0))
       result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
+      assertThat(result).isNull()
+      assertThat(format.reachedEnd).isTrue
     } catch {
       case ex: Exception =>
         fail("Test failed due to a " + ex.getClass.getName + ": " + 
ex.getMessage)
@@ -294,16 +290,16 @@ class CsvInputFormatTest {
       format.open(split)
       var result: (Int, Int) = null
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(111), result._1)
-      assertEquals(Integer.valueOf(222), result._2)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo(Integer.valueOf(111))
+      assertThat(result._2).isEqualTo(Integer.valueOf(222))
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(666), result._1)
-      assertEquals(Integer.valueOf(777), result._2)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo(Integer.valueOf(666))
+      assertThat(result._2).isEqualTo(Integer.valueOf(777))
       result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
+      assertThat(result).isNull()
+      assertThat(format.reachedEnd).isTrue
     } catch {
       case ex: Exception =>
         fail("Test failed due to a " + ex.getClass.getName + ": " + 
ex.getMessage)
@@ -324,18 +320,18 @@ class CsvInputFormatTest {
       format.open(split)
       var result: (Int, Int, Int) = null
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(111), result._1)
-      assertEquals(Integer.valueOf(444), result._2)
-      assertEquals(Integer.valueOf(888), result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo(Integer.valueOf(111))
+      assertThat(result._2).isEqualTo(Integer.valueOf(444))
+      assertThat(result._3).isEqualTo(Integer.valueOf(888))
       result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(0), result._1)
-      assertEquals(Integer.valueOf(777), result._2)
-      assertEquals(Integer.valueOf(333), result._3)
+      assertThat(result).isNotNull
+      assertThat(result._1).isEqualTo(Integer.valueOf(0))
+      assertThat(result._2).isEqualTo(Integer.valueOf(777))
+      assertThat(result._3).isEqualTo(Integer.valueOf(333))
       result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
+      assertThat(result).isNull()
+      assertThat(format.reachedEnd).isTrue
     } catch {
       case ex: Exception =>
         fail("Test failed due to a " + ex.getClass.getName + ": " + 
ex.getMessage)
@@ -363,7 +359,7 @@ class CsvInputFormatTest {
     this.testRemovingTrailingCR("\r\n", "\n")
   }
 
-  private def testRemovingTrailingCR(lineBreakerInFile: String, 
lineBreakerSetup: String) {
+  private def testRemovingTrailingCR(lineBreakerInFile: String, 
lineBreakerSetup: String): Unit = {
     var tempFile: File = null
     val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + 
lineBreakerInFile
     try {
@@ -382,11 +378,11 @@ class CsvInputFormatTest {
       val splits = inputFormat.createInputSplits(1)
       inputFormat.open(splits(0))
       var result = inputFormat.nextRecord(null)
-      assertNotNull("Expecting to not return null", result)
-      assertEquals(FIRST_PART, result._1)
+      assertThat(result).as("Expecting to not return null").isNotNull
+      assertThat(result._1).isEqualTo(FIRST_PART)
       result = inputFormat.nextRecord(result)
-      assertNotNull("Expecting to not return null", result)
-      assertEquals(SECOND_PART, result._1)
+      assertThat(result).as("Expecting to not return null").isNotNull
+      assertThat(result._1).isEqualTo(SECOND_PART)
     } catch {
       case t: Throwable =>
         System.err.println("test failed with exception: " + t.getMessage)
@@ -404,26 +400,26 @@ class CsvInputFormatTest {
   private def validatePOJOItem(format: PojoCsvInputFormat[POJOItem]): Unit = {
     var result = new POJOItem()
     result = format.nextRecord(result)
-    assertEquals(123, result.field1)
-    assertEquals("HELLO", result.field2)
-    assertEquals(3.123, result.field3, 0.001)
+    assertThat(result.field1).isEqualTo(123)
+    assertThat(result.field2).isEqualTo("HELLO")
+    assertThat(result.field3).isEqualTo(3.123, offset(0.001))
 
     result = format.nextRecord(result)
-    assertEquals(456, result.field1)
-    assertEquals("ABC", result.field2)
-    assertEquals(1.234, result.field3, 0.001)
+    assertThat(result.field1).isEqualTo(456)
+    assertThat(result.field2).isEqualTo("ABC")
+    assertThat(result.field3).isEqualTo(1.234, offset(0.001))
   }
 
   private def validateCaseClassItem(format: 
TupleCsvInputFormat[CaseClassItem]): Unit = {
     var result = format.nextRecord(null)
-    assertEquals(123, result.field1)
-    assertEquals("HELLO", result.field2)
-    assertEquals(3.123, result.field3, 0.001)
+    assertThat(result.field1).isEqualTo(123)
+    assertThat(result.field2).isEqualTo("HELLO")
+    assertThat(result.field3).isEqualTo(3.123, offset(0.001))
 
     result = format.nextRecord(null)
-    assertEquals(456, result.field1)
-    assertEquals("ABC", result.field2)
-    assertEquals(1.234, result.field3, 0.001)
+    assertThat(result.field1).isEqualTo(456)
+    assertThat(result.field2).isEqualTo("ABC")
+    assertThat(result.field3).isEqualTo(1.234, offset(0.001))
   }
 
   @Test
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala
index 5c72c4f0e0b..496d5563a27 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala
@@ -17,11 +17,9 @@
  */
 package org.apache.flink.api.scala.metrics
 
-import org.apache.flink.util.TestLogger
+import org.junit.jupiter.api.Test
 
-import org.junit.Test
-
-class ScalaGaugeTest extends TestLogger {
+class ScalaGaugeTest {
 
   @Test
   def testGaugeCorrectValue(): Unit = {
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
index afe7e928e5a..a5c105778d2 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
@@ -19,7 +19,6 @@ package org.apache.flink.api.scala.runtime
 
 import org.apache.flink.api.common.typeutils.{GenericPairComparator, 
TypeComparator, TypeSerializer}
 import org.apache.flink.api.common.typeutils.base.{DoubleComparator, 
DoubleSerializer, IntComparator, IntSerializer}
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator
 import org.apache.flink.api.scala.runtime.tuple.base.PairComparatorTestBase
 import org.apache.flink.api.scala.typeutils.CaseClassComparator
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index e12b2869e3b..1de51c72343 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import com.esotericsoftware.kryo.{Kryo, Serializer}
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.joda.time.LocalDate
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 import scala.reflect._
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
index 530269b017e..857593d917b 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
@@ -25,9 +25,9 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.util.StringUtils
 
+import org.assertj.core.api.Assertions.{assertThat, fail}
 import org.joda.time.LocalDate
-import org.junit.{Assert, Test}
-import org.junit.Assert._
+import org.junit.jupiter.api.Test
 
 import java.util
 import java.util.Random
@@ -49,14 +49,17 @@ class TupleSerializerTest {
     duplicateSerializer.getFieldSerializers
 
     // the list of child serializers must be duplicated
-    assertTrue(duplicateSerializer.getFieldSerializers ne 
originalSerializer.getFieldSerializers)
+    assertThat(
+      duplicateSerializer.getFieldSerializers ne 
originalSerializer.getFieldSerializers).isTrue
 
     // each of the child serializers (which are themselves 
CaseClassSerializers) must be duplicated
-    assertTrue(
-      duplicateSerializer.getFieldSerializers()(0) ne 
originalSerializer.getFieldSerializers()(0))
+    assertThat(
+      duplicateSerializer.getFieldSerializers()(0) ne 
originalSerializer.getFieldSerializers()(
+        0)).isTrue
 
-    assertTrue(
-      duplicateSerializer.getFieldSerializers()(1) ne 
originalSerializer.getFieldSerializers()(1))
+    assertThat(
+      duplicateSerializer.getFieldSerializers()(1) ne 
originalSerializer.getFieldSerializers()(
+        1)).isTrue
   }
 
   @Test
@@ -259,7 +262,9 @@ class TupleSerializerTest {
     runTests(testTuples, -1)
   }
 
-  final private def runTests[T <: Product: TypeInformation](instances: 
Array[T], length: Int) {
+  final private def runTests[T <: Product: TypeInformation](
+      instances: Array[T],
+      length: Int): Unit = {
     try {
       // Register the custom Kryo Serializer
       val conf = new ExecutionConfig()
@@ -271,11 +276,10 @@ class TupleSerializerTest {
       val test = new TupleSerializerTestInstance[T](serializer, tupleClass, 
length, instances)
       test.testAll()
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         System.err.println(e.getMessage)
         e.printStackTrace()
-        Assert.fail(e.getMessage)
-      }
+        fail(e.getMessage)
     }
   }
 }
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
index 77f4ba0bb7d..0227935ba7b 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
@@ -18,14 +18,12 @@
 package org.apache.flink.api.scala.runtime.tuple.base
 
 import org.apache.flink.api.common.typeutils.TypePairComparator
-import org.apache.flink.util.TestLogger
 
-import org.junit.Assert.assertTrue
-import org.junit.Assert.fail
-import org.junit.Test
+import org.assertj.core.api.Assertions.{assertThat, fail}
+import org.junit.jupiter.api.Test
 
 /** Abstract test base for PairComparators. */
-abstract class PairComparatorTestBase[T, R] extends TestLogger {
+abstract class PairComparatorTestBase[T, R] {
   protected def createComparator(ascending: Boolean): TypePairComparator[T, R]
 
   protected def getSortedTestData: (Array[T], Array[R])
@@ -36,16 +34,15 @@ abstract class PairComparatorTestBase[T, R] extends 
TestLogger {
       val comparator = getComparator(ascending = true)
 
       val (dataT, dataR) = getSortedData
-      for (i <- 0 until dataT.length) {
+      for (i <- dataT.indices) {
         comparator.setReference(dataT(i))
-        assertTrue(comparator.equalToReference(dataR(i)))
+        assertThat(comparator.equalToReference(dataR(i))).isTrue
       }
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         System.err.println(e.getMessage)
         e.printStackTrace()
         fail("Exception in test: " + e.getMessage)
-      }
     }
   }
 
@@ -55,7 +52,7 @@ abstract class PairComparatorTestBase[T, R] extends 
TestLogger {
     testGreatSmallAscDescWithReference(ascending = false)
   }
 
-  protected def testGreatSmallAscDescWithReference(ascending: Boolean) {
+  protected def testGreatSmallAscDescWithReference(ascending: Boolean): Unit = 
{
     try {
       val (dataT, dataR) = getSortedData
       val comparator = getComparator(ascending)
@@ -63,18 +60,17 @@ abstract class PairComparatorTestBase[T, R] extends 
TestLogger {
         for (y <- (x + 1) until dataR.length) {
           comparator.setReference(dataT(x))
           if (ascending) {
-            assertTrue(comparator.compareToReference(dataR(y)) > 0)
+            
assertThat(comparator.compareToReference(dataR(y))).isGreaterThan(0)
           } else {
-            assertTrue(comparator.compareToReference(dataR(y)) < 0)
+            assertThat(comparator.compareToReference(dataR(y))).isLessThan(0)
           }
         }
       }
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         System.err.println(e.getMessage)
         e.printStackTrace()
         fail("Exception in test: " + e.getMessage)
-      }
     }
   }
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
index e86b0c0d492..2e9bbfe39c2 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
@@ -22,7 +22,7 @@ import 
org.apache.flink.api.common.typeutils.ComparatorTestBase
 import org.assertj.core.api.AssertionsForClassTypes.assertThat
 
 abstract class TupleComparatorTestBase[T <: Product] extends 
ComparatorTestBase[T] {
-  override protected def deepEquals(message: String, should: T, is: T) {
+  override protected def deepEquals(message: String, should: T, is: T): Unit = 
{
     for (i <- 0 until should.productArity) {
       assertThat(is.productElement(i)).isEqualTo(should.productElement(i))
     }
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
index 00f4a9bbb90..324c037e606 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
@@ -27,7 +27,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, 
TraversableSerializer, UnitTypeInfo}
 import org.apache.flink.types.{IntValue, StringValue}
 
-import org.junit.{Assert, Test}
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
 
 @SerialVersionUID(-1509730037212683566L)
 case class CustomCaseClass(a: String, b: Int)
@@ -50,44 +51,44 @@ class TypeInformationGenTest {
   def testJavaTuple(): Unit = {
     val ti = createTypeInformation[org.apache.flink.api.java.tuple.Tuple3[Int, 
String, Integer]]
 
-    Assert.assertTrue(ti.isTupleType)
-    Assert.assertEquals(3, ti.getArity)
-    Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+    assertThat(ti.isTupleType).isTrue
+    assertThat(ti.getArity).isEqualTo(3)
+    assertThat(ti).isInstanceOf(classOf[TupleTypeInfoBase[_]])
     val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(classOf[org.apache.flink.api.java.tuple.Tuple3[_, _, 
_]], tti.getTypeClass)
+    
assertThat(tti.getTypeClass).isEqualTo(classOf[org.apache.flink.api.java.tuple.Tuple3[_,
 _, _]])
     for (i <- 0 until 3) {
-      Assert.assertTrue(tti.getTypeAt(i).isInstanceOf[BasicTypeInfo[_]])
+      assertThat(tti.getTypeAt(i)).isInstanceOf(classOf[BasicTypeInfo[_]])
     }
 
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(0))
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(2))
+    assertThat(tti.getTypeAt(0)).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    assertThat(tti.getTypeAt(1)).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+    assertThat(tti.getTypeAt(2)).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
   }
 
   @Test
   def testCustomJavaTuple(): Unit = {
     val ti = createTypeInformation[CustomTuple]
 
-    Assert.assertTrue(ti.isTupleType)
-    Assert.assertEquals(2, ti.getArity)
-    Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+    assertThat(ti.isTupleType).isTrue
+    assertThat(ti.getArity).isEqualTo(2)
+    assertThat(ti).isInstanceOf(classOf[TupleTypeInfoBase[_]])
     val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(classOf[CustomTuple], tti.getTypeClass)
+    assertThat(tti.getTypeClass).isEqualTo(classOf[CustomTuple])
     for (i <- 0 until 2) {
-      Assert.assertTrue(tti.getTypeAt(i).isInstanceOf[BasicTypeInfo[_]])
+      assertThat(tti.getTypeAt(i)).isInstanceOf(classOf[BasicTypeInfo[_]])
     }
 
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(1))
+    assertThat(tti.getTypeAt(0)).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+    assertThat(tti.getTypeAt(1)).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
   }
 
   @Test
   def testBasicType(): Unit = {
     val ti = createTypeInformation[Boolean]
 
-    Assert.assertTrue(ti.isBasicType)
-    Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti)
-    Assert.assertEquals(classOf[java.lang.Boolean], ti.getTypeClass)
+    assertThat(ti.isBasicType).isTrue
+    assertThat(ti).isEqualTo(BasicTypeInfo.BOOLEAN_TYPE_INFO)
+    assertThat(ti.getTypeClass).isEqualTo(classOf[java.lang.Boolean])
   }
 
   @Test
@@ -99,9 +100,9 @@ class TypeInformationGenTest {
 
       val ti = createTypeInformation[(T, Seq[T])]
 
-      Assert.assertTrue(ti.isTupleType)
+      assertThat(ti.isTupleType).isTrue
       val ccti = ti.asInstanceOf[CaseClassTypeInfo[(T, Seq[T])]]
-      Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, ccti.getTypeAt(0))
+      assertThat(ccti.getTypeAt(0)).isEqualTo(BasicTypeInfo.DOUBLE_TYPE_INFO)
 
       (data.head, data)
     }
@@ -132,177 +133,167 @@ class TypeInformationGenTest {
       createTypeInformation[Array[T]]
     }
 
-    Assert.assertEquals(
-      PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO,
-      getType(boolArray))
+    assertThat(getType(boolArray)).isEqualTo(
+      PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+    )
 
-    Assert.assertEquals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, 
getType(byteArray))
+    
assertThat(getType(byteArray)).isEqualTo(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    Assert.assertEquals(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, 
getType(charArray))
+    
assertThat(getType(charArray)).isEqualTo(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    
Assert.assertEquals(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, 
getType(shortArray))
+    assertThat(getType(shortArray)).isEqualTo(
+      PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    Assert.assertEquals(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, 
getType(intArray))
+    
assertThat(getType(intArray)).isEqualTo(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    Assert.assertEquals(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, 
getType(longArray))
+    
assertThat(getType(longArray)).isEqualTo(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    
Assert.assertEquals(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, 
getType(floatArray))
+    assertThat(getType(floatArray)).isEqualTo(
+      PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    Assert.assertEquals(
-      PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
-      getType(doubleArray))
+    assertThat(getType(doubleArray)).isEqualTo(
+      PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, 
getType(stringArray))
+    
assertThat(getType(stringArray)).isEqualTo(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
 
-    Assert.assertTrue(getType(objectArray).isInstanceOf[ObjectArrayTypeInfo[_, 
_]])
-    Assert.assertTrue(
+    
assertThat(getType(objectArray)).isInstanceOf(classOf[ObjectArrayTypeInfo[_, 
_]])
+    assertThat(
       getType(objectArray)
         .asInstanceOf[ObjectArrayTypeInfo[_, _]]
         .getComponentInfo
-        .isInstanceOf[PojoTypeInfo[_]])
+        .isInstanceOf[PojoTypeInfo[_]]).isTrue
   }
 
   @Test
   def testTupleWithBasicTypes(): Unit = {
     val ti = createTypeInformation[(Int, Long, Double, Float, Boolean, String, 
Char, Short, Byte)]
 
-    Assert.assertTrue(ti.isTupleType)
-    Assert.assertEquals(9, ti.getArity)
-    Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+    assertThat(ti.isTupleType).isTrue
+    assertThat(ti.getArity).isEqualTo(9)
+    assertThat(ti).isInstanceOf(classOf[TupleTypeInfoBase[_]])
     val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(classOf[Tuple9[_, _, _, _, _, _, _, _, _]], 
tti.getTypeClass)
+    assertThat(tti.getTypeClass).isEqualTo(classOf[(_, _, _, _, _, _, _, _, 
_)])
     for (i <- 0 until 9) {
-      Assert.assertTrue(tti.getTypeAt(i).isInstanceOf[BasicTypeInfo[_]])
+      assertThat(tti.getTypeAt(i)).isInstanceOf(classOf[BasicTypeInfo[_]])
     }
 
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(0))
-    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1))
-    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tti.getTypeAt(2))
-    Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti.getTypeAt(3))
-    Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(4))
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(5))
-    Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, tti.getTypeAt(6))
-    Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, tti.getTypeAt(7))
-    Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, tti.getTypeAt(8))
+    assertThat(tti.getTypeAt(0)).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    assertThat(tti.getTypeAt(1)).isEqualTo(BasicTypeInfo.LONG_TYPE_INFO)
+    assertThat(tti.getTypeAt(2)).isEqualTo(BasicTypeInfo.DOUBLE_TYPE_INFO)
+    assertThat(tti.getTypeAt(3)).isEqualTo(BasicTypeInfo.FLOAT_TYPE_INFO)
+    assertThat(tti.getTypeAt(4)).isEqualTo(BasicTypeInfo.BOOLEAN_TYPE_INFO)
+    assertThat(tti.getTypeAt(5)).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+    assertThat(tti.getTypeAt(6)).isEqualTo(BasicTypeInfo.CHAR_TYPE_INFO)
+    assertThat(tti.getTypeAt(7)).isEqualTo(BasicTypeInfo.SHORT_TYPE_INFO)
+    assertThat(tti.getTypeAt(8)).isEqualTo(BasicTypeInfo.BYTE_TYPE_INFO)
   }
 
   @Test
   def testTupleWithTuples(): Unit = {
     val ti = createTypeInformation[(Tuple1[String], Tuple1[Int], Tuple2[Long, 
Long])]
 
-    Assert.assertTrue(ti.isTupleType())
-    Assert.assertEquals(3, ti.getArity)
-    Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+    assertThat(ti.isTupleType).isTrue
+    assertThat(ti.getArity).isEqualTo(3)
+    assertThat(ti).isInstanceOf(classOf[TupleTypeInfoBase[_]])
     val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(classOf[Tuple3[_, _, _]], tti.getTypeClass)
-    Assert.assertTrue(tti.getTypeAt(0).isTupleType())
-    Assert.assertTrue(tti.getTypeAt(1).isTupleType())
-    Assert.assertTrue(tti.getTypeAt(2).isTupleType())
-    Assert.assertEquals(classOf[Tuple1[_]], tti.getTypeAt(0).getTypeClass)
-    Assert.assertEquals(classOf[Tuple1[_]], tti.getTypeAt(1).getTypeClass)
-    Assert.assertEquals(classOf[Tuple2[_, _]], tti.getTypeAt(2).getTypeClass)
-    Assert.assertEquals(1, tti.getTypeAt(0).getArity)
-    Assert.assertEquals(1, tti.getTypeAt(1).getArity)
-    Assert.assertEquals(2, tti.getTypeAt(2).getArity)
-    Assert.assertEquals(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      tti.getTypeAt(0).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
-    Assert.assertEquals(
-      BasicTypeInfo.INT_TYPE_INFO,
-      tti.getTypeAt(1).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
-    Assert.assertEquals(
-      BasicTypeInfo.LONG_TYPE_INFO,
-      tti.getTypeAt(2).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
-    Assert.assertEquals(
-      BasicTypeInfo.LONG_TYPE_INFO,
-      tti.getTypeAt(2).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1))
+    assertThat(tti.getTypeClass).isEqualTo(classOf[(_, _, _)])
+    assertThat(tti.getTypeAt(0).isTupleType).isTrue
+    assertThat(tti.getTypeAt(1).isTupleType).isTrue
+    assertThat(tti.getTypeAt(2).isTupleType).isTrue
+    assertThat(tti.getTypeAt(0).getTypeClass).isEqualTo(classOf[Tuple1[_]])
+    assertThat(tti.getTypeAt(1).getTypeClass).isEqualTo(classOf[Tuple1[_]])
+    assertThat(tti.getTypeAt(2).getTypeClass).isEqualTo(classOf[(_, _)])
+    assertThat(tti.getTypeAt(0).getArity).isEqualTo(1)
+    assertThat(tti.getTypeAt(1).getArity).isEqualTo(1)
+    assertThat(tti.getTypeAt(2).getArity).isEqualTo(2)
+    
assertThat(tti.getTypeAt(0).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+      .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+    
assertThat(tti.getTypeAt(1).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+      .isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    
assertThat(tti.getTypeAt(2).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+      .isEqualTo(BasicTypeInfo.LONG_TYPE_INFO)
+    
assertThat(tti.getTypeAt(2).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1))
+      .isEqualTo(BasicTypeInfo.LONG_TYPE_INFO)
   }
 
   @Test
   def testCaseClass(): Unit = {
     val ti = createTypeInformation[CustomCaseClass]
 
-    Assert.assertTrue(ti.isTupleType)
-    Assert.assertEquals(2, ti.getArity)
-    Assert.assertEquals(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
-    Assert.assertEquals(
-      BasicTypeInfo.INT_TYPE_INFO,
-      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1))
-    Assert.assertEquals(
-      classOf[CustomCaseClass],
-      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeClass())
+    assertThat(ti.isTupleType).isTrue
+    assertThat(ti.getArity).isEqualTo(2)
+    assertThat(ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+      .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+    assertThat(ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1))
+      .isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    assertThat(ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeClass)
+      .isEqualTo(classOf[CustomCaseClass])
   }
 
   @Test
   def testCustomType(): Unit = {
     val ti = createTypeInformation[CustomType]
 
-    Assert.assertFalse(ti.isBasicType)
-    Assert.assertFalse(ti.isTupleType)
-    Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]])
-    Assert.assertEquals(ti.getTypeClass, classOf[CustomType])
+    assertThat(ti.isBasicType).isFalse
+    assertThat(ti.isTupleType).isFalse
+    assertThat(ti).isInstanceOf(classOf[PojoTypeInfo[_]])
+    assertThat(ti.getTypeClass).isEqualTo(classOf[CustomType])
   }
 
   @Test
   def testTupleWithCustomType(): Unit = {
     val ti = createTypeInformation[(Long, CustomType)]
 
-    Assert.assertTrue(ti.isTupleType)
-    Assert.assertEquals(2, ti.getArity)
+    assertThat(ti.isTupleType).isTrue
+    assertThat(ti.getArity).isEqualTo(2)
     val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(classOf[Tuple2[_, _]], tti.getTypeClass)
-    Assert.assertEquals(classOf[java.lang.Long], tti.getTypeAt(0).getTypeClass)
-    Assert.assertTrue(tti.getTypeAt(1).isInstanceOf[PojoTypeInfo[_]])
-    Assert.assertEquals(classOf[CustomType], tti.getTypeAt(1).getTypeClass)
+    assertThat(tti.getTypeClass).isEqualTo(classOf[(_, _)])
+    
assertThat(tti.getTypeAt(0).getTypeClass).isEqualTo(classOf[java.lang.Long])
+    assertThat(tti.getTypeAt(1)).isInstanceOf(classOf[PojoTypeInfo[_]])
+    assertThat(tti.getTypeAt(1).getTypeClass).isEqualTo(classOf[CustomType])
   }
 
   @Test
   def testValue(): Unit = {
     val ti = createTypeInformation[StringValue]
 
-    Assert.assertFalse(ti.isBasicType)
-    Assert.assertFalse(ti.isTupleType)
-    Assert.assertTrue(ti.isInstanceOf[ValueTypeInfo[_]])
-    Assert.assertEquals(ti.getTypeClass, classOf[StringValue])
-    Assert.assertTrue(
+    assertThat(ti.isBasicType).isFalse
+    assertThat(ti.isTupleType).isFalse
+    assertThat(ti).isInstanceOf(classOf[ValueTypeInfo[_]])
+    assertThat(ti.getTypeClass).isEqualTo(classOf[StringValue])
+    assertThat(
       TypeExtractor
-        .getForClass(classOf[StringValue])
-        .isInstanceOf[ValueTypeInfo[_]])
-    Assert.assertEquals(
-      TypeExtractor.getForClass(classOf[StringValue]).getTypeClass,
-      ti.getTypeClass)
+        .getForClass(classOf[StringValue]))
+      .isInstanceOf(classOf[ValueTypeInfo[_]])
+    assertThat(TypeExtractor.getForClass(classOf[StringValue]).getTypeClass)
+      .isEqualTo(ti.getTypeClass)
   }
 
   @Test
   def testTupleOfValues(): Unit = {
     val ti = createTypeInformation[(StringValue, IntValue)]
-    Assert.assertFalse(ti.isBasicType)
-    Assert.assertTrue(ti.isTupleType)
-    Assert.assertEquals(
-      classOf[StringValue],
-      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0).getTypeClass)
-    Assert.assertEquals(
-      classOf[IntValue],
-      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1).getTypeClass)
+    assertThat(ti.isBasicType).isFalse
+    assertThat(ti.isTupleType).isTrue
+    assertThat(ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0).getTypeClass)
+      .isEqualTo(classOf[StringValue])
+    assertThat(ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1).getTypeClass)
+      .isEqualTo(classOf[IntValue])
   }
 
   @Test
   def testBasicArray(): Unit = {
     val ti = createTypeInformation[Array[String]]
 
-    Assert.assertFalse(ti.isBasicType)
-    Assert.assertFalse(ti.isTupleType)
-    Assert.assertTrue(
-      ti.isInstanceOf[BasicArrayTypeInfo[_, _]] ||
-        ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+    assertThat(ti.isBasicType).isFalse
+    assertThat(ti.isTupleType).isFalse
+    assertThat(ti).isInstanceOfAny(
+      classOf[BasicArrayTypeInfo[_, _]],
+      classOf[ObjectArrayTypeInfo[_, _]])
     if (ti.isInstanceOf[BasicArrayTypeInfo[_, _]]) {
-      Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, ti)
+      assertThat(ti).isEqualTo(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
     } else {
-      Assert.assertEquals(
-        BasicTypeInfo.STRING_TYPE_INFO,
-        ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentInfo)
+      assertThat(ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentInfo)
+        .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
     }
   }
 
@@ -310,29 +301,28 @@ class TypeInformationGenTest {
   def testPrimitiveArray(): Unit = {
     val ti = createTypeInformation[Array[Boolean]]
 
-    Assert.assertTrue(ti.isInstanceOf[PrimitiveArrayTypeInfo[_]])
-    Assert.assertEquals(ti, 
PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
+    assertThat(ti).isInstanceOf(classOf[PrimitiveArrayTypeInfo[_]])
+    
assertThat(ti).isEqualTo(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
   }
 
   @Test
   def testCustomArray(): Unit = {
     val ti = createTypeInformation[Array[CustomType]]
-    Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
-    Assert.assertEquals(
-      classOf[CustomType],
-      ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentInfo.getTypeClass)
+    assertThat(ti).isInstanceOf(classOf[ObjectArrayTypeInfo[_, _]])
+    assertThat(ti.asInstanceOf[ObjectArrayTypeInfo[_, 
_]].getComponentInfo.getTypeClass)
+      .isEqualTo(classOf[CustomType])
   }
 
   @Test
   def testTupleArray(): Unit = {
     val ti = createTypeInformation[Array[(String, String)]]
 
-    Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+    assertThat(ti).isInstanceOf(classOf[ObjectArrayTypeInfo[_, _]])
     val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
-    Assert.assertTrue(oati.getComponentInfo.isTupleType)
+    assertThat(oati.getComponentInfo.isTupleType).isTrue
     val tti = oati.getComponentInfo.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0))
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1))
+    assertThat(tti.getTypeAt(0)).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+    assertThat(tti.getTypeAt(1)).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
   }
 
   @Test
@@ -341,24 +331,23 @@ class TypeInformationGenTest {
     {
       val ti = createTypeInformation[Array[Array[(String, String)]]]
 
-      Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      assertThat(ti).isInstanceOf(classOf[ObjectArrayTypeInfo[_, _]])
       val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
-      
Assert.assertTrue(oati.getComponentInfo.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      
assertThat(oati.getComponentInfo).isInstanceOf(classOf[ObjectArrayTypeInfo[_, 
_]])
       val oati2 = oati.getComponentInfo.asInstanceOf[ObjectArrayTypeInfo[_, _]]
-      Assert.assertTrue(oati2.getComponentInfo.isTupleType)
+      assertThat(oati2.getComponentInfo.isTupleType).isTrue
       val tti = oati2.getComponentInfo.asInstanceOf[TupleTypeInfoBase[_]]
-      Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0))
-      Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1))
+      assertThat(tti.getTypeAt(0)).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+      assertThat(tti.getTypeAt(1)).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
     }
 
     // primitives
     {
       val ti = createTypeInformation[Array[Array[Int]]]
 
-      Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      assertThat(ti).isInstanceOf(classOf[ObjectArrayTypeInfo[_, _]])
       val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
-      Assert.assertEquals(
-        oati.getComponentInfo,
+      assertThat(oati.getComponentInfo).isEqualTo(
         PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
     }
 
@@ -366,21 +355,21 @@ class TypeInformationGenTest {
     {
       val ti = createTypeInformation[Array[Array[Integer]]]
 
-      Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      assertThat(ti).isInstanceOf(classOf[ObjectArrayTypeInfo[_, _]])
       val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
-      Assert.assertEquals(oati.getComponentInfo, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)
+      
assertThat(oati.getComponentInfo).isEqualTo(BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)
     }
 
     // pojo
     {
       val ti = createTypeInformation[Array[Array[CustomType]]]
 
-      Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      assertThat(ti).isInstanceOf(classOf[ObjectArrayTypeInfo[_, _]])
       val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
-      
Assert.assertTrue(oati.getComponentInfo.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      
assertThat(oati.getComponentInfo).isInstanceOf(classOf[ObjectArrayTypeInfo[_, 
_]])
       val oati2 = oati.getComponentInfo.asInstanceOf[ObjectArrayTypeInfo[_, _]]
       val tti = oati2.getComponentInfo.asInstanceOf[PojoTypeInfo[_]]
-      Assert.assertEquals(classOf[CustomType], tti.getTypeClass())
+      assertThat(tti.getTypeClass).isEqualTo(classOf[CustomType])
     }
   }
 
@@ -388,7 +377,7 @@ class TypeInformationGenTest {
   def testParamertizedCustomObject(): Unit = {
     val ti = createTypeInformation[MyObject[String]]
 
-    Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]])
+    assertThat(ti).isInstanceOf(classOf[PojoTypeInfo[_]])
   }
 
   @Test
@@ -405,15 +394,15 @@ class TypeInformationGenTest {
         Array[String])]
 
     val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, 
tti.getTypeAt(0))
-    
Assert.assertEquals(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, 
tti.getTypeAt(1))
-    Assert.assertEquals(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, 
tti.getTypeAt(2))
-    Assert.assertEquals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, 
tti.getTypeAt(3))
-    Assert.assertEquals(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, 
tti.getTypeAt(4))
-    
Assert.assertEquals(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, 
tti.getTypeAt(5))
-    
Assert.assertEquals(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, 
tti.getTypeAt(6))
-    
Assert.assertEquals(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, 
tti.getTypeAt(7))
-    Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, 
tti.getTypeAt(8))
+    
assertThat(tti.getTypeAt(0)).isEqualTo(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+    
assertThat(tti.getTypeAt(1)).isEqualTo(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)
+    
assertThat(tti.getTypeAt(2)).isEqualTo(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+    
assertThat(tti.getTypeAt(3)).isEqualTo(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+    
assertThat(tti.getTypeAt(4)).isEqualTo(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
+    
assertThat(tti.getTypeAt(5)).isEqualTo(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)
+    
assertThat(tti.getTypeAt(6)).isEqualTo(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)
+    
assertThat(tti.getTypeAt(7)).isEqualTo(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
+    
assertThat(tti.getTypeAt(8)).isEqualTo(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
   }
 
   @Test
@@ -425,7 +414,7 @@ class TypeInformationGenTest {
 
     val ti = createTypeInformation[TestTrait]
 
-    Assert.assertTrue(ti.isInstanceOf[GenericTypeInfo[TestTrait]])
+    assertThat(ti).isInstanceOf(classOf[GenericTypeInfo[TestTrait]])
   }
 
   @Test
@@ -433,92 +422,88 @@ class TypeInformationGenTest {
 
     val tupleTypeInfo = createTypeInformation[(Int, Int, Int, Int)]
       .asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int)]]
-    Assert.assertEquals(0, tupleTypeInfo.getFlatFields("0").get(0).getPosition)
-    Assert.assertEquals(1, tupleTypeInfo.getFlatFields("1").get(0).getPosition)
-    Assert.assertEquals(2, tupleTypeInfo.getFlatFields("2").get(0).getPosition)
-    Assert.assertEquals(3, tupleTypeInfo.getFlatFields("3").get(0).getPosition)
-    Assert.assertEquals(0, 
tupleTypeInfo.getFlatFields("_1").get(0).getPosition)
-    Assert.assertEquals(1, 
tupleTypeInfo.getFlatFields("_2").get(0).getPosition)
-    Assert.assertEquals(2, 
tupleTypeInfo.getFlatFields("_3").get(0).getPosition)
-    Assert.assertEquals(3, 
tupleTypeInfo.getFlatFields("_4").get(0).getPosition)
+    
assertThat(tupleTypeInfo.getFlatFields("0").get(0).getPosition).isEqualTo(0)
+    
assertThat(tupleTypeInfo.getFlatFields("1").get(0).getPosition).isEqualTo(1)
+    
assertThat(tupleTypeInfo.getFlatFields("2").get(0).getPosition).isEqualTo(2)
+    
assertThat(tupleTypeInfo.getFlatFields("3").get(0).getPosition).isEqualTo(3)
+    
assertThat(tupleTypeInfo.getFlatFields("_1").get(0).getPosition).isEqualTo(0)
+    
assertThat(tupleTypeInfo.getFlatFields("_2").get(0).getPosition).isEqualTo(1)
+    
assertThat(tupleTypeInfo.getFlatFields("_3").get(0).getPosition).isEqualTo(2)
+    
assertThat(tupleTypeInfo.getFlatFields("_4").get(0).getPosition).isEqualTo(3)
 
     val nestedTypeInfo = createTypeInformation[(Int, (Int, String, Long), Int, 
(Double, Double))]
       .asInstanceOf[CaseClassTypeInfo[(Int, (Int, String, Long), Int, (Double, 
Double))]]
-    Assert.assertEquals(0, 
nestedTypeInfo.getFlatFields("0").get(0).getPosition)
-    Assert.assertEquals(1, 
nestedTypeInfo.getFlatFields("1.0").get(0).getPosition)
-    Assert.assertEquals(2, 
nestedTypeInfo.getFlatFields("1.1").get(0).getPosition)
-    Assert.assertEquals(3, 
nestedTypeInfo.getFlatFields("1.2").get(0).getPosition)
-    Assert.assertEquals(4, 
nestedTypeInfo.getFlatFields("2").get(0).getPosition)
-    Assert.assertEquals(5, 
nestedTypeInfo.getFlatFields("3.0").get(0).getPosition)
-    Assert.assertEquals(6, 
nestedTypeInfo.getFlatFields("3.1").get(0).getPosition)
-    Assert.assertEquals(4, 
nestedTypeInfo.getFlatFields("_3").get(0).getPosition)
-    Assert.assertEquals(5, 
nestedTypeInfo.getFlatFields("_4._1").get(0).getPosition)
-    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1").size)
-    Assert.assertEquals(1, 
nestedTypeInfo.getFlatFields("1").get(0).getPosition)
-    Assert.assertEquals(2, 
nestedTypeInfo.getFlatFields("1").get(1).getPosition)
-    Assert.assertEquals(3, 
nestedTypeInfo.getFlatFields("1").get(2).getPosition)
-    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.*").size)
-    Assert.assertEquals(1, 
nestedTypeInfo.getFlatFields("1.*").get(0).getPosition)
-    Assert.assertEquals(2, 
nestedTypeInfo.getFlatFields("1.*").get(1).getPosition)
-    Assert.assertEquals(3, 
nestedTypeInfo.getFlatFields("1.*").get(2).getPosition)
-    Assert.assertEquals(2, nestedTypeInfo.getFlatFields("3").size)
-    Assert.assertEquals(5, 
nestedTypeInfo.getFlatFields("3").get(0).getPosition)
-    Assert.assertEquals(6, 
nestedTypeInfo.getFlatFields("3").get(1).getPosition)
-    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("_2").size)
-    Assert.assertEquals(1, 
nestedTypeInfo.getFlatFields("_2").get(0).getPosition)
-    Assert.assertEquals(2, 
nestedTypeInfo.getFlatFields("_2").get(1).getPosition)
-    Assert.assertEquals(3, 
nestedTypeInfo.getFlatFields("_2").get(2).getPosition)
-    Assert.assertEquals(2, nestedTypeInfo.getFlatFields("_4").size)
-    Assert.assertEquals(5, 
nestedTypeInfo.getFlatFields("_4").get(0).getPosition)
-    Assert.assertEquals(6, 
nestedTypeInfo.getFlatFields("_4").get(1).getPosition)
-    Assert.assertEquals(
-      BasicTypeInfo.INT_TYPE_INFO,
-      nestedTypeInfo.getFlatFields("0").get(0).getType)
-    Assert.assertEquals(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      nestedTypeInfo.getFlatFields("1.1").get(0).getType)
-    Assert.assertEquals(
-      BasicTypeInfo.LONG_TYPE_INFO,
-      nestedTypeInfo.getFlatFields("1").get(2).getType)
-    Assert.assertEquals(
-      BasicTypeInfo.DOUBLE_TYPE_INFO,
-      nestedTypeInfo.getFlatFields("3").get(1).getType)
+    
assertThat(nestedTypeInfo.getFlatFields("0").get(0).getPosition).isEqualTo(0)
+    
assertThat(nestedTypeInfo.getFlatFields("1.0").get(0).getPosition).isEqualTo(1)
+    
assertThat(nestedTypeInfo.getFlatFields("1.1").get(0).getPosition).isEqualTo(2)
+    
assertThat(nestedTypeInfo.getFlatFields("1.2").get(0).getPosition).isEqualTo(3)
+    
assertThat(nestedTypeInfo.getFlatFields("2").get(0).getPosition).isEqualTo(4)
+    
assertThat(nestedTypeInfo.getFlatFields("3.0").get(0).getPosition).isEqualTo(5)
+    
assertThat(nestedTypeInfo.getFlatFields("3.1").get(0).getPosition).isEqualTo(6)
+    
assertThat(nestedTypeInfo.getFlatFields("_3").get(0).getPosition).isEqualTo(4)
+    
assertThat(nestedTypeInfo.getFlatFields("_4._1").get(0).getPosition).isEqualTo(5)
+    assertThat(nestedTypeInfo.getFlatFields("1").size()).isEqualTo(3)
+    
assertThat(nestedTypeInfo.getFlatFields("1").get(0).getPosition).isEqualTo(1)
+    
assertThat(nestedTypeInfo.getFlatFields("1").get(1).getPosition).isEqualTo(2)
+    
assertThat(nestedTypeInfo.getFlatFields("1").get(2).getPosition).isEqualTo(3)
+    assertThat(nestedTypeInfo.getFlatFields("1.*").size()).isEqualTo(3)
+    
assertThat(nestedTypeInfo.getFlatFields("1.*").get(0).getPosition).isEqualTo(1)
+    
assertThat(nestedTypeInfo.getFlatFields("1.*").get(1).getPosition).isEqualTo(2)
+    
assertThat(nestedTypeInfo.getFlatFields("1.*").get(2).getPosition).isEqualTo(3)
+    assertThat(nestedTypeInfo.getFlatFields("3").size()).isEqualTo(2)
+    
assertThat(nestedTypeInfo.getFlatFields("3").get(0).getPosition).isEqualTo(5)
+    
assertThat(nestedTypeInfo.getFlatFields("3").get(1).getPosition).isEqualTo(6)
+    assertThat(nestedTypeInfo.getFlatFields("_2").size()).isEqualTo(3)
+    
assertThat(nestedTypeInfo.getFlatFields("_2").get(0).getPosition).isEqualTo(1)
+    
assertThat(nestedTypeInfo.getFlatFields("_2").get(1).getPosition).isEqualTo(2)
+    
assertThat(nestedTypeInfo.getFlatFields("_2").get(2).getPosition).isEqualTo(3)
+    assertThat(nestedTypeInfo.getFlatFields("_4").size()).isEqualTo(2)
+    
assertThat(nestedTypeInfo.getFlatFields("_4").get(0).getPosition).isEqualTo(5)
+    
assertThat(nestedTypeInfo.getFlatFields("_4").get(1).getPosition).isEqualTo(6)
+    assertThat(nestedTypeInfo.getFlatFields("0").get(0).getType)
+      .isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    assertThat(nestedTypeInfo.getFlatFields("1.1").get(0).getType)
+      .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+    assertThat(nestedTypeInfo.getFlatFields("1").get(2).getType)
+      .isEqualTo(BasicTypeInfo.LONG_TYPE_INFO)
+    assertThat(nestedTypeInfo.getFlatFields("3").get(1).getType)
+      .isEqualTo(BasicTypeInfo.DOUBLE_TYPE_INFO)
 
     val deepNestedTupleTypeInfo = createTypeInformation[(Int, (Int, (Int, 
Int)), Int)]
       .asInstanceOf[CaseClassTypeInfo[(Int, (Int, (Int, Int)), Int)]]
-    Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("1").size)
-    Assert.assertEquals(1, 
deepNestedTupleTypeInfo.getFlatFields("1").get(0).getPosition)
-    Assert.assertEquals(2, 
deepNestedTupleTypeInfo.getFlatFields("1").get(1).getPosition)
-    Assert.assertEquals(3, 
deepNestedTupleTypeInfo.getFlatFields("1").get(2).getPosition)
-    Assert.assertEquals(5, deepNestedTupleTypeInfo.getFlatFields("*").size)
-    Assert.assertEquals(0, 
deepNestedTupleTypeInfo.getFlatFields("*").get(0).getPosition)
-    Assert.assertEquals(1, 
deepNestedTupleTypeInfo.getFlatFields("*").get(1).getPosition)
-    Assert.assertEquals(2, 
deepNestedTupleTypeInfo.getFlatFields("*").get(2).getPosition)
-    Assert.assertEquals(3, 
deepNestedTupleTypeInfo.getFlatFields("*").get(3).getPosition)
-    Assert.assertEquals(4, 
deepNestedTupleTypeInfo.getFlatFields("*").get(4).getPosition)
+    assertThat(deepNestedTupleTypeInfo.getFlatFields("1").size()).isEqualTo(3)
+    
assertThat(deepNestedTupleTypeInfo.getFlatFields("1").get(0).getPosition).isEqualTo(1)
+    
assertThat(deepNestedTupleTypeInfo.getFlatFields("1").get(1).getPosition).isEqualTo(2)
+    
assertThat(deepNestedTupleTypeInfo.getFlatFields("1").get(2).getPosition).isEqualTo(3)
+    assertThat(deepNestedTupleTypeInfo.getFlatFields("*").size()).isEqualTo(5)
+    
assertThat(deepNestedTupleTypeInfo.getFlatFields("*").get(0).getPosition).isEqualTo(0)
+    
assertThat(deepNestedTupleTypeInfo.getFlatFields("*").get(1).getPosition).isEqualTo(1)
+    
assertThat(deepNestedTupleTypeInfo.getFlatFields("*").get(2).getPosition).isEqualTo(2)
+    
assertThat(deepNestedTupleTypeInfo.getFlatFields("*").get(3).getPosition).isEqualTo(3)
+    
assertThat(deepNestedTupleTypeInfo.getFlatFields("*").get(4).getPosition).isEqualTo(4)
 
     val caseClassTypeInfo =
       
createTypeInformation[CustomCaseClass].asInstanceOf[CaseClassTypeInfo[CustomCaseClass]]
-    Assert.assertEquals(0, 
caseClassTypeInfo.getFlatFields("a").get(0).getPosition)
-    Assert.assertEquals(1, 
caseClassTypeInfo.getFlatFields("b").get(0).getPosition)
-    Assert.assertEquals(2, caseClassTypeInfo.getFlatFields("*").size)
-    Assert.assertEquals(0, 
caseClassTypeInfo.getFlatFields("*").get(0).getPosition)
-    Assert.assertEquals(1, 
caseClassTypeInfo.getFlatFields("*").get(1).getPosition)
+    
assertThat(caseClassTypeInfo.getFlatFields("a").get(0).getPosition).isEqualTo(0)
+    
assertThat(caseClassTypeInfo.getFlatFields("b").get(0).getPosition).isEqualTo(1)
+    assertThat(caseClassTypeInfo.getFlatFields("*").size()).isEqualTo(2)
+    
assertThat(caseClassTypeInfo.getFlatFields("*").get(0).getPosition).isEqualTo(0)
+    
assertThat(caseClassTypeInfo.getFlatFields("*").get(1).getPosition).isEqualTo(1)
 
     val caseClassInTupleTypeInfo = createTypeInformation[(Int, 
UmlautCaseClass)]
       .asInstanceOf[CaseClassTypeInfo[(Int, UmlautCaseClass)]]
-    Assert.assertEquals(1, 
caseClassInTupleTypeInfo.getFlatFields("_2.ä").get(0).getPosition)
-    Assert.assertEquals(2, 
caseClassInTupleTypeInfo.getFlatFields("1.ß").get(0).getPosition)
-    Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1").size)
-    Assert.assertEquals(1, 
caseClassInTupleTypeInfo.getFlatFields("1.*").get(0).getPosition)
-    Assert.assertEquals(2, 
caseClassInTupleTypeInfo.getFlatFields("1").get(1).getPosition)
-    Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("_2.*").size)
-    Assert.assertEquals(1, 
caseClassInTupleTypeInfo.getFlatFields("_2.*").get(0).getPosition)
-    Assert.assertEquals(2, 
caseClassInTupleTypeInfo.getFlatFields("_2").get(1).getPosition)
-    Assert.assertEquals(3, caseClassInTupleTypeInfo.getFlatFields("*").size)
-    Assert.assertEquals(0, 
caseClassInTupleTypeInfo.getFlatFields("*").get(0).getPosition)
-    Assert.assertEquals(1, 
caseClassInTupleTypeInfo.getFlatFields("*").get(1).getPosition)
-    Assert.assertEquals(2, 
caseClassInTupleTypeInfo.getFlatFields("*").get(2).getPosition)
+    
assertThat(caseClassInTupleTypeInfo.getFlatFields("_2.ä").get(0).getPosition).isEqualTo(1)
+    
assertThat(caseClassInTupleTypeInfo.getFlatFields("1.ß").get(0).getPosition).isEqualTo(2)
+    assertThat(caseClassInTupleTypeInfo.getFlatFields("1").size()).isEqualTo(2)
+    
assertThat(caseClassInTupleTypeInfo.getFlatFields("1.*").get(0).getPosition).isEqualTo(1)
+    
assertThat(caseClassInTupleTypeInfo.getFlatFields("1").get(1).getPosition).isEqualTo(2)
+    
assertThat(caseClassInTupleTypeInfo.getFlatFields("_2.*").size()).isEqualTo(2)
+    
assertThat(caseClassInTupleTypeInfo.getFlatFields("_2.*").get(0).getPosition).isEqualTo(1)
+    
assertThat(caseClassInTupleTypeInfo.getFlatFields("_2").get(1).getPosition).isEqualTo(2)
+    assertThat(caseClassInTupleTypeInfo.getFlatFields("*").size()).isEqualTo(3)
+    
assertThat(caseClassInTupleTypeInfo.getFlatFields("*").get(0).getPosition).isEqualTo(0)
+    
assertThat(caseClassInTupleTypeInfo.getFlatFields("*").get(1).getPosition).isEqualTo(1)
+    
assertThat(caseClassInTupleTypeInfo.getFlatFields("*").get(2).getPosition).isEqualTo(2)
 
   }
 
@@ -527,46 +512,45 @@ class TypeInformationGenTest {
 
     val tupleTypeInfo = createTypeInformation[(Int, Int, Int, Int)]
       .asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int)]]
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("0"))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("2"))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("_2"))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("_4"))
+    
assertThat(tupleTypeInfo.getTypeAt("0")).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    
assertThat(tupleTypeInfo.getTypeAt("2")).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    
assertThat(tupleTypeInfo.getTypeAt("_2")).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    
assertThat(tupleTypeInfo.getTypeAt("_4")).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
 
     val nestedTypeInfo = createTypeInformation[(Int, (Int, String, Long), Int, 
(Double, Double))]
       .asInstanceOf[CaseClassTypeInfo[(Int, (Int, String, Long), Int, (Double, 
Double))]]
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("0"))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("1.0"))
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
nestedTypeInfo.getTypeAt("1.1"))
-    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
nestedTypeInfo.getTypeAt("1.2"))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("2"))
-    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
nestedTypeInfo.getTypeAt("3.0"))
-    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
nestedTypeInfo.getTypeAt("3.1"))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("_3"))
-    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
nestedTypeInfo.getTypeAt("_4._1"))
-    Assert.assertEquals(createTypeInformation[(Int, String, Long)], 
nestedTypeInfo.getTypeAt("1"))
-    Assert.assertEquals(createTypeInformation[(Double, Double)], 
nestedTypeInfo.getTypeAt("3"))
-    Assert.assertEquals(createTypeInformation[(Int, String, Long)], 
nestedTypeInfo.getTypeAt("_2"))
-    Assert.assertEquals(createTypeInformation[(Double, Double)], 
nestedTypeInfo.getTypeAt("_4"))
+    
assertThat(nestedTypeInfo.getTypeAt("0")).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    
assertThat(nestedTypeInfo.getTypeAt("1.0")).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    
assertThat(nestedTypeInfo.getTypeAt("1.1")).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+    
assertThat(nestedTypeInfo.getTypeAt("1.2")).isEqualTo(BasicTypeInfo.LONG_TYPE_INFO)
+    
assertThat(nestedTypeInfo.getTypeAt("2")).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    
assertThat(nestedTypeInfo.getTypeAt("3.0")).isEqualTo(BasicTypeInfo.DOUBLE_TYPE_INFO)
+    
assertThat(nestedTypeInfo.getTypeAt("3.1")).isEqualTo(BasicTypeInfo.DOUBLE_TYPE_INFO)
+    
assertThat(nestedTypeInfo.getTypeAt("_3")).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    
assertThat(nestedTypeInfo.getTypeAt("_4._1")).isEqualTo(BasicTypeInfo.DOUBLE_TYPE_INFO)
+    
assertThat(nestedTypeInfo.getTypeAt("1")).isEqualTo(createTypeInformation[(Int, 
String, Long)])
+    
assertThat(nestedTypeInfo.getTypeAt("3")).isEqualTo(createTypeInformation[(Double,
 Double)])
+    
assertThat(nestedTypeInfo.getTypeAt("_2")).isEqualTo(createTypeInformation[(Int,
 String, Long)])
+    
assertThat(nestedTypeInfo.getTypeAt("_4")).isEqualTo(createTypeInformation[(Double,
 Double)])
 
     val deepNestedTupleTypeInfo = createTypeInformation[(Int, (Int, (Int, 
Int)), Int)]
       .asInstanceOf[CaseClassTypeInfo[(Int, (Int, (Int, Int)), Int)]]
-    Assert.assertEquals(
-      createTypeInformation[(Int, (Int, Int))],
-      deepNestedTupleTypeInfo.getTypeAt("1"))
+    assertThat(deepNestedTupleTypeInfo.getTypeAt("1"))
+      .isEqualTo(createTypeInformation[(Int, (Int, Int))])
 
     val umlautCaseClassTypeInfo =
       
createTypeInformation[UmlautCaseClass].asInstanceOf[CaseClassTypeInfo[UmlautCaseClass]]
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
umlautCaseClassTypeInfo.getTypeAt("ä"))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
umlautCaseClassTypeInfo.getTypeAt("ß"))
+    
assertThat(umlautCaseClassTypeInfo.getTypeAt("ä")).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+    
assertThat(umlautCaseClassTypeInfo.getTypeAt("ß")).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
 
     val caseClassTypeInfo =
       
createTypeInformation[CustomCaseClass].asInstanceOf[CaseClassTypeInfo[CustomCaseClass]]
     val caseClassInTupleTypeInfo = createTypeInformation[(Int, 
CustomCaseClass)]
       .asInstanceOf[CaseClassTypeInfo[(Int, CustomCaseClass)]]
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
caseClassInTupleTypeInfo.getTypeAt("_2.a"))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
caseClassInTupleTypeInfo.getTypeAt("1.b"))
-    Assert.assertEquals(caseClassTypeInfo, 
caseClassInTupleTypeInfo.getTypeAt("1"))
-    Assert.assertEquals(caseClassTypeInfo, 
caseClassInTupleTypeInfo.getTypeAt("_2"))
+    
assertThat(caseClassInTupleTypeInfo.getTypeAt("_2.a")).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO)
+    
assertThat(caseClassInTupleTypeInfo.getTypeAt("1.b")).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    
assertThat(caseClassTypeInfo).isEqualTo(caseClassInTupleTypeInfo.getTypeAt("1"))
+    
assertThat(caseClassTypeInfo).isEqualTo(caseClassInTupleTypeInfo.getTypeAt("_2"))
 
   }
 
@@ -589,7 +573,7 @@ class TypeInformationGenTest {
   @Test
   def testUnit(): Unit = {
     val ti = createTypeInformation[Unit]
-    Assert.assertTrue(ti.isInstanceOf[UnitTypeInfo])
+    assertThat(ti).isInstanceOf(classOf[UnitTypeInfo])
 
     // This checks the condition in checkCollection. If this fails with 
IllegalArgumentException,
     // then things like "env.fromElements((),(),())" won't work.
@@ -610,17 +594,17 @@ class TypeInformationGenTest {
     // make sure that we still create the correct inner element serializer, 
despite the generics
     val innerTraversableSerializer = 
outerTraversableSerializer.elementSerializer
       .asInstanceOf[TraversableSerializer[Seq[Int], Int]]
-    Assert.assertEquals(
-      classOf[IntSerializer],
-      innerTraversableSerializer.elementSerializer.getClass)
+    assertThat(innerTraversableSerializer.elementSerializer.getClass)
+      .isEqualTo(classOf[IntSerializer])
 
     // if the code in here had Ts it would not compile. This would already 
fail above when
     // creating the serializer. This is just to verify.
-    Assert.assertEquals(
-      "implicitly[scala.collection.generic.CanBuildFrom[" +
-        "Seq[Seq[Object]], Seq[Object], Seq[Seq[Object]]]" +
-        "]",
-      outerTraversableSerializer.cbfCode)
+    assertThat(outerTraversableSerializer.cbfCode)
+      .isEqualTo(
+        "implicitly[scala.collection.generic.CanBuildFrom[" +
+          "Seq[Seq[Object]], Seq[Object], Seq[Seq[Object]]]" +
+          "]"
+      )
   }
 
   @Test
@@ -633,14 +617,13 @@ class TypeInformationGenTest {
 
     val innerTraversableSerializer = 
outerTraversableSerializer.elementSerializer
       .asInstanceOf[TraversableSerializer[Seq[Int], Int]]
-    Assert.assertEquals(
-      classOf[IntSerializer],
-      innerTraversableSerializer.elementSerializer.getClass)
-
-    Assert.assertEquals(
-      "implicitly[scala.collection.generic.CanBuildFrom[" +
-        "Seq[Seq[Int]], Seq[Int], Seq[Seq[Int]]]" +
-        "]",
-      outerTraversableSerializer.cbfCode)
+    assertThat(innerTraversableSerializer.elementSerializer.getClass)
+      .isEqualTo(classOf[IntSerializer])
+
+    assertThat(outerTraversableSerializer.cbfCode)
+      .isEqualTo(
+        "implicitly[scala.collection.generic.CanBuildFrom[" +
+          "Seq[Seq[Int]], Seq[Int], Seq[Seq[Int]]]" +
+          "]")
   }
 }
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
index 05fd26d6f3d..228a816368d 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
@@ -19,26 +19,20 @@ package org.apache.flink.api.scala.typeutils
 
 import 
org.apache.flink.api.common.typeutils.{TypeSerializerSchemaCompatibility, 
TypeSerializerSnapshotSerializationUtil}
 import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
-import org.apache.flink.util.TestLogger
 
-import org.junit.{Rule, Test}
-import org.junit.Assert._
-import org.junit.rules.TemporaryFolder
-import org.scalatest.junit.JUnitSuiteLike
+import org.assertj.core.api.Assertions.{assertThat, fail}
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.io.TempDir
 
 import java.io._
 import java.net.{URL, URLClassLoader}
+import java.nio.file.{Files, Path}
 
 import scala.reflect.NameTransformer
 import scala.tools.nsc.{Global, Settings}
 import scala.tools.nsc.reporters.ConsoleReporter
 
-class EnumValueSerializerCompatibilityTest extends TestLogger with 
JUnitSuiteLike {
-
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
+class EnumValueSerializerCompatibilityTest {
 
   val enumName = "EnumValueSerializerUpgradeTestEnum"
 
@@ -81,42 +75,46 @@ class EnumValueSerializerCompatibilityTest extends 
TestLogger with JUnitSuiteLik
 
   /** Check that identical enums don't require migration */
   @Test
-  def checkIdenticalEnums(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumA).isCompatibleAsIs)
+  def checkIdenticalEnums(@TempDir tempFolder: Path): Unit = {
+    assertThat(checkCompatibility(enumA, enumA, 
tempFolder).isCompatibleAsIs).isTrue
   }
 
   /** Check that appending fields to the enum does not require migration */
   @Test
-  def checkAppendedField(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumB).isCompatibleAsIs)
+  def checkAppendedField(@TempDir tempFolder: Path): Unit = {
+    assertThat(checkCompatibility(enumA, enumB, 
tempFolder).isCompatibleAsIs).isTrue
   }
 
   /** Check that removing enum fields makes the snapshot incompatible. */
   @Test
-  def checkRemovedField(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumC).isIncompatible)
+  def checkRemovedField(@TempDir tempFolder: Path): Unit = {
+    assertThat(checkCompatibility(enumA, enumC, 
tempFolder).isIncompatible).isTrue
   }
 
   /** Check that changing the enum field order makes the snapshot 
incompatible. */
   @Test
-  def checkDifferentFieldOrder(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumD).isIncompatible)
+  def checkDifferentFieldOrder(@TempDir tempFolder: Path): Unit = {
+    assertThat(checkCompatibility(enumA, enumD, 
tempFolder).isIncompatible).isTrue
   }
 
   /** Check that changing the enum ids causes a migration */
   @Test
-  def checkDifferentIds(): Unit = {
-    assertTrue(
-      "Different ids should be incompatible.",
-      checkCompatibility(enumA, enumE).isIncompatible)
+  def checkDifferentIds(@TempDir tempFolder: Path): Unit = {
+    assertThat(checkCompatibility(enumA, enumE, tempFolder).isIncompatible)
+      .as("Different ids should be incompatible.")
+      .isTrue
   }
 
   def checkCompatibility(
       enumSourceA: String,
-      enumSourceB: String): 
TypeSerializerSchemaCompatibility[Enumeration#Value] = {
+      enumSourceB: String,
+      tempFolder: Path): TypeSerializerSchemaCompatibility[Enumeration#Value] 
= {
     import EnumValueSerializerCompatibilityTest._
 
-    val classLoader = compileAndLoadEnum(tempFolder.newFolder(), 
s"$enumName.scala", enumSourceA)
+    val classLoader = compileAndLoadEnum(
+      Files.createTempDirectory(tempFolder, "classLoader").toFile,
+      s"$enumName.scala",
+      enumSourceA)
 
     val enum = instantiateEnum[Enumeration](classLoader, enumName)
 
@@ -133,7 +131,10 @@ class EnumValueSerializerCompatibilityTest extends 
TestLogger with JUnitSuiteLik
     val bais = new ByteArrayInputStream(baos.toByteArray)
     val input = new DataInputViewStreamWrapper(bais)
 
-    val classLoader2 = compileAndLoadEnum(tempFolder.newFolder(), 
s"$enumName.scala", enumSourceB)
+    val classLoader2 = compileAndLoadEnum(
+      Files.createTempDirectory(tempFolder, "classLoader2").toFile,
+      s"$enumName.scala",
+      enumSourceB)
 
     val snapshot2 = TypeSerializerSnapshotSerializationUtil
       .readSerializerSnapshot[Enumeration#Value](input, classLoader2)
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala
index 26c60a25dad..9a9faf9c369 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala
@@ -17,25 +17,22 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.util.TestLogger
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
 
-import org.junit.Assert._
-import org.junit.Test
-import org.scalatest.junit.JUnitSuiteLike
-
-class EnumValueSerializerTest extends TestLogger with JUnitSuiteLike {
+class EnumValueSerializerTest {
 
   /**
    * Tests that the snapshot configuration can be created and that the 
serializer is compatible when
    * being called with the created serializer snapshot
    */
   @Test
-  def testEnumValueSerializerEnsureCompatibilityIdempotency() {
+  def testEnumValueSerializerEnsureCompatibilityIdempotency(): Unit = {
     val enumSerializer = new EnumValueSerializer(Letters)
 
     val snapshot = enumSerializer.snapshotConfiguration()
 
-    
assertTrue(snapshot.resolveSchemaCompatibility(enumSerializer).isCompatibleAsIs)
+    
assertThat(snapshot.resolveSchemaCompatibility(enumSerializer).isCompatibleAsIs).isTrue
   }
 }
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/InstantiationUtilTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/InstantiationUtilTest.scala
index bef3015fcc8..fe6bcc60937 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/InstantiationUtilTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/InstantiationUtilTest.scala
@@ -17,10 +17,10 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.util.{InstantiationUtil, TestLogger}
+import org.apache.flink.util.InstantiationUtil
 
-import org.hamcrest.Matchers
-import org.junit.{Assert, Test}
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
 
 import java.io.ByteArrayOutputStream
 
@@ -28,7 +28,7 @@ import java.io.ByteArrayOutputStream
  * Serialization/Deserialization tests of Scala types using the
  * [[org.apache.flink.util.InstantiationUtil]].
  */
-class InstantiationUtilTest extends TestLogger {
+class InstantiationUtilTest {
 
   @Test
   def testNestedScalaTypeSerDe(): Unit = {
@@ -36,7 +36,7 @@ class InstantiationUtilTest extends TestLogger {
 
     val copy = serializeDeserializeInstance(instance)
 
-    Assert.assertThat(copy, Matchers.equalTo(instance))
+    assertThat(copy).isEqualTo(instance)
   }
 
   @Test
@@ -56,7 +56,7 @@ class InstantiationUtilTest extends TestLogger {
 
     val copy = serializeDeserializeInstance(instance)
 
-    Assert.assertThat(copy, Matchers.equalTo(instance))
+    assertThat(copy).isEqualTo(instance)
   }
 
   private def serializeDeserializeInstance[T](instance: T): T = {
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerReflectionTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerReflectionTest.scala
index 17261a9a55d..ef7fb89b5fa 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerReflectionTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerReflectionTest.scala
@@ -18,13 +18,12 @@
 package org.apache.flink.api.scala.typeutils
 
 import 
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializerReflectionTest._
-import org.apache.flink.util.TestLogger
 
-import org.junit.Assert.assertEquals
-import org.junit.Test
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.junit.jupiter.api.Test
 
 /** Test obtaining the primary constructor of a case class via reflection. */
-class ScalaCaseClassSerializerReflectionTest extends TestLogger {
+class ScalaCaseClassSerializerReflectionTest {
 
   @Test
   def usageExample(): Unit = {
@@ -33,7 +32,7 @@ class ScalaCaseClassSerializerReflectionTest extends 
TestLogger {
 
     val actual = constructor(Array("hi", 1.asInstanceOf[AnyRef]))
 
-    assertEquals(SimpleCaseClass("hi", 1), actual)
+    assertThat(actual).isEqualTo(SimpleCaseClass("hi", 1))
   }
 
   @Test
@@ -43,7 +42,7 @@ class ScalaCaseClassSerializerReflectionTest extends 
TestLogger {
 
     val actual = constructor(Array(1.asInstanceOf[AnyRef]))
 
-    assertEquals(Generic[Int](1), actual)
+    assertThat(actual).isEqualTo(Generic[Int](1))
   }
 
   @Test
@@ -53,7 +52,7 @@ class ScalaCaseClassSerializerReflectionTest extends 
TestLogger {
 
     val actual = constructor(Array(List(1, 2, 3), "hey"))
 
-    assertEquals(HigherKind(List(1, 2, 3), "hey"), actual)
+    assertThat(actual).isEqualTo(HigherKind(List(1, 2, 3), "hey"))
   }
 
   @Test
@@ -63,16 +62,19 @@ class ScalaCaseClassSerializerReflectionTest extends 
TestLogger {
 
     val actual = constructor(Array("a", "b", 7.asInstanceOf[AnyRef]))
 
-    assertEquals(("a", "b", 7), actual)
+    assertThat(actual).isEqualTo(("a", "b", 7))
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test
   def unsupportedInstanceClass(): Unit = {
 
     val outerInstance = new OuterClass
 
-    ScalaCaseClassSerializer
-      .lookupConstructor(classOf[outerInstance.InnerCaseClass])
+    assertThatThrownBy(
+      () =>
+        ScalaCaseClassSerializer
+          .lookupConstructor(classOf[outerInstance.InnerCaseClass]))
+      .isInstanceOf(classOf[IllegalArgumentException])
   }
 
   @Test
@@ -87,7 +89,7 @@ class ScalaCaseClassSerializerReflectionTest extends 
TestLogger {
 
     val actual = constructor(arguments)
 
-    assertEquals(Measurement(1, new DegreeCelsius(0.5f)), actual)
+    assertThat(actual).isEqualTo(Measurement(1, new DegreeCelsius(0.5f)))
   }
 }
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
index ffd1cf2141b..6fded259af5 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
@@ -22,31 +22,29 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, ResultTypeQueryable}
 import org.apache.flink.api.scala._
 import 
org.apache.flink.api.scala.typeutils.TypeExtractionTest.{CustomBeanClass, 
CustomTypeInputFormat}
-import org.apache.flink.util.TestLogger
 
-import org.junit.Assert._
-import org.junit.Test
-import org.scalatest.junit.JUnitSuiteLike
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
 
 import scala.beans.BeanProperty
 
-class TypeExtractionTest extends TestLogger with JUnitSuiteLike {
+class TypeExtractionTest {
 
   @Test
   def testResultTypeQueryable(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val producedType = env.createInput(new CustomTypeInputFormat).getType()
-    assertEquals(producedType, BasicTypeInfo.LONG_TYPE_INFO)
+    assertThat(producedType).isEqualTo(BasicTypeInfo.LONG_TYPE_INFO)
   }
 
   @Test
   def testBeanPropertyClass(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val producedType = env.fromElements(new CustomBeanClass()).getType()
-    assertTrue(producedType.isInstanceOf[PojoTypeInfo[_]])
+    assertThat(producedType).isInstanceOf(classOf[PojoTypeInfo[_]])
     val pojoTypeInfo = producedType.asInstanceOf[PojoTypeInfo[_]]
-    assertEquals(pojoTypeInfo.getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO)
-    assertEquals(pojoTypeInfo.getTypeAt(1), BasicTypeInfo.LONG_TYPE_INFO)
+    
assertThat(pojoTypeInfo.getTypeAt(0)).isEqualTo(BasicTypeInfo.INT_TYPE_INFO)
+    
assertThat(pojoTypeInfo.getTypeAt(1)).isEqualTo(BasicTypeInfo.LONG_TYPE_INFO)
   }
 
 }
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
index b2b9643b35f..f04a022d9e7 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
@@ -25,74 +25,72 @@ import org.apache.flink.api.java.typeutils.{EitherTypeInfo 
=> JavaEitherTypeInfo
 import org.apache.flink.api.java.typeutils.TypeInfoFactoryTest._
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.TypeInfoFactoryTest._
-import org.apache.flink.util.TestLogger
 
-import org.junit.Assert._
-import org.junit.Test
-import org.scalatest.junit.JUnitSuiteLike
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
 
 import java.lang.reflect.Type
 import java.util
 
-class TypeInfoFactoryTest extends TestLogger with JUnitSuiteLike {
+class TypeInfoFactoryTest {
 
   @Test
   def testSimpleType(): Unit = {
     val ti = createTypeInformation[ScalaIntLike]
-    assertEquals(INT_TYPE_INFO, ti)
+    assertThat(ti).isEqualTo(INT_TYPE_INFO)
   }
 
   @Test
   def testMyTuple(): Unit = {
     val ti = createTypeInformation[MyTuple[Double, String]]
-    assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+    assertThat(ti).isInstanceOf(classOf[MyTupleTypeInfo[_, _]])
     val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
-    assertEquals(DOUBLE_TYPE_INFO, mtti.getField0)
-    assertEquals(STRING_TYPE_INFO, mtti.getField1)
+    assertThat(mtti.getField0).isEqualTo(DOUBLE_TYPE_INFO)
+    assertThat(mtti.getField1).isEqualTo(STRING_TYPE_INFO)
   }
 
   @Test
-  def testMyTupleHierarchy() {
+  def testMyTupleHierarchy(): Unit = {
     val ti = createTypeInformation[MyTuple2]
-    assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+    assertThat(ti).isInstanceOf(classOf[MyTupleTypeInfo[_, _]])
     val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
-    assertEquals(STRING_TYPE_INFO, mtti.getField0)
-    assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+    assertThat(mtti.getField0).isEqualTo(STRING_TYPE_INFO)
+    assertThat(mtti.getField1).isEqualTo(BOOLEAN_TYPE_INFO)
 
     val ti2 = createTypeInformation[MyScalaTupleClass]
-    assertTrue(ti2.isInstanceOf[MyTupleTypeInfo[_, _]])
+    assertThat(ti2).isInstanceOf(classOf[MyTupleTypeInfo[_, _]])
     val mtti2 = ti2.asInstanceOf[MyTupleTypeInfo[_, _]]
-    assertEquals(STRING_TYPE_INFO, mtti.getField0)
-    assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+    assertThat(mtti2.getField0).isEqualTo(STRING_TYPE_INFO)
+    assertThat(mtti2.getField1).isEqualTo(BOOLEAN_TYPE_INFO)
   }
 
   @Test
   def testMyTupleHierarchyWithCaseClass(): Unit = {
     val ti = createTypeInformation[MyScalaTupleCaseClass]
-    assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+    assertThat(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
     val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
-    assertEquals(DOUBLE_TYPE_INFO, mtti.getField0)
-    assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+    assertThat(mtti.getField0).isEqualTo(DOUBLE_TYPE_INFO)
+    assertThat(mtti.getField1).isEqualTo(BOOLEAN_TYPE_INFO)
   }
 
   @Test
   def testMyEitherGenericType(): Unit = {
     val ti = createTypeInformation[MyScalaEither[String, (Double, Int)]]
-    assertTrue(ti.isInstanceOf[JavaEitherTypeInfo[_, _]])
+    assertThat(ti).isInstanceOf(classOf[JavaEitherTypeInfo[_, _]])
     val eti = ti.asInstanceOf[JavaEitherTypeInfo[_, _]]
-    assertEquals(STRING_TYPE_INFO, eti.getLeftType)
-    assertTrue(eti.getRightType.isInstanceOf[CaseClassTypeInfo[_]])
+    assertThat(eti.getLeftType).isEqualTo(STRING_TYPE_INFO)
+    assertThat(eti.getRightType).isInstanceOf(classOf[CaseClassTypeInfo[_]])
     val cti = eti.getRightType.asInstanceOf[CaseClassTypeInfo[_]]
-    assertEquals(DOUBLE_TYPE_INFO, cti.getTypeAt(0))
-    assertEquals(INT_TYPE_INFO, cti.getTypeAt(1))
+    assertThat(cti.getTypeAt(0)).isEqualTo(DOUBLE_TYPE_INFO)
+    assertThat(cti.getTypeAt(1)).isEqualTo(INT_TYPE_INFO)
   }
 
   @Test
   def testScalaFactory(): Unit = {
     val ti = createTypeInformation[MyScalaOption[Double]]
-    assertTrue(ti.isInstanceOf[MyScalaOptionTypeInfo])
+    assertThat(ti).isInstanceOf(classOf[MyScalaOptionTypeInfo])
     val moti = ti.asInstanceOf[MyScalaOptionTypeInfo]
-    assertEquals(DOUBLE_TYPE_INFO, moti.elementType)
+    assertThat(moti.elementType).isEqualTo(DOUBLE_TYPE_INFO)
   }
 }
 


Reply via email to