[ 
https://issues.apache.org/jira/browse/BEAM-607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16279614#comment-16279614
 ] 

ASF GitHub Bot commented on BEAM-607:
-------------------------------------

jkff closed pull request #1464: BEAM-607: Add a bounded source for Apache 
DistributedLog (incubating)
URL: https://github.com/apache/beam/pull/1464
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/distributedlog/README.md 
b/sdks/java/io/distributedlog/README.md
new file mode 100644
index 00000000000..c3b025821ca
--- /dev/null
+++ b/sdks/java/io/distributedlog/README.md
@@ -0,0 +1,24 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# DistributedLog IO
+
+This library provides sources and sinks to make it possible to read and
+write Apache DistributedLog (incubating) streams in bounded and unbounded way
+from Apache Beam pipelines.
diff --git a/sdks/java/io/distributedlog/pom.xml 
b/sdks/java/io/distributedlog/pom.xml
new file mode 100644
index 00000000000..59bedce9e9c
--- /dev/null
+++ b/sdks/java/io/distributedlog/pom.xml
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.4.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-distributedlog</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: DistributedLog</name>
+  <description>Library to read DistributedLog streams.</description>
+
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>findbugs-maven-plugin</artifactId>
+          <configuration>
+            <skip>true</skip>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <beamUseDummyRunner>false</beamUseDummyRunner>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>distributedlog-core_2.11</artifactId>
+      <version>0.3.51-RC1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <!-- test dependencies-->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+</project>
diff --git 
a/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/DLBoundedSource.java
 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/DLBoundedSource.java
new file mode 100644
index 00000000000..1a3cff559b3
--- /dev/null
+++ 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/DLBoundedSource.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.distributedlog;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.DistributedLogManager;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.commons.configuration.ConfigurationException;
+
+/**
+ * A {@code BoundedSource} for reading log streams resident in a 
DistributedLog cluster.
+ */
+class DLBoundedSource<R> extends BoundedSource<R> {
+
+  private static final long serialVersionUID = 3551126210922557330L;
+
+  protected final URI dlUri;
+  protected final Optional<URI> confUri;
+  protected final List<String> streams;
+  protected final Coder<R> rCoder;
+  protected final @Nullable LogSegmentBundle segment;
+
+  DLBoundedSource(
+      URI dlUri,
+      Optional<URI> confUri,
+      List<String> streams,
+      Coder<R> rCoder,
+      LogSegmentBundle segment) {
+    this.dlUri = dlUri;
+    this.confUri = confUri;
+    this.streams = streams;
+    this.rCoder = rCoder;
+    this.segment = segment;
+  }
+
+  DistributedLogNamespace setupNamespace() throws IOException {
+    DistributedLogConfiguration conf = new DistributedLogConfiguration();
+    if (confUri.isPresent()) {
+      try {
+        conf.loadConf(confUri.get().toURL());
+      } catch (ConfigurationException e) {
+        throw new IOException("Failed to load distributedlog configuration 
from " + confUri, e);
+      } catch (MalformedURLException e) {
+        throw new IOException("Invalid distributedlog configuration uri " + 
confUri, e);
+      }
+    }
+    return DistributedLogNamespaceBuilder.newBuilder()
+        .conf(conf)
+        .uri(dlUri)
+        .clientId("distributedlog-bounded-source-" + 
InetAddress.getLocalHost().getHostName())
+        .statsLogger(NullStatsLogger.INSTANCE)
+        .regionId(DistributedLogConstants.LOCAL_REGION_ID)
+        .build();
+  }
+
+  List<String> getStreams() {
+    return streams;
+  }
+
+  @Override
+  public void validate() {
+    checkArgument(!streams.isEmpty(), "need to set the streams of a 
DLBoundedSource");
+    checkNotNull(rCoder, "need to set the key coder of a DLBoundedSource");
+  }
+
+  static List<LogSegmentBundle> getAllLogSegments(
+      DistributedLogNamespace namespace,
+      List<String> streams) throws IOException {
+    List<LogSegmentBundle> segments = Lists.newArrayList();
+    for (String stream : streams) {
+      segments.addAll(getLogSegments(namespace, stream));
+    }
+    return segments;
+  }
+
+  private static List<LogSegmentBundle> getLogSegments(
+      DistributedLogNamespace namespace,
+      final String stream) throws IOException {
+    DistributedLogManager manager = namespace.openLog(stream);
+    try {
+      return Lists.transform(manager.getLogSegments(),
+          new Function<LogSegmentMetadata, LogSegmentBundle>() {
+            @Nullable
+            @Override
+            public LogSegmentBundle apply(@Nullable LogSegmentMetadata 
metadata) {
+              return new LogSegmentBundle(stream, metadata);
+            }
+          });
+    } finally {
+      manager.close();
+    }
+  }
+
+  @Override
+  public List<? extends BoundedSource<R>> splitIntoBundles(long 
desiredBundleSizeBytes,
+                                                           PipelineOptions 
options) throws Exception {
+    DistributedLogNamespace namespace = setupNamespace();
+    try {
+      if (null == segment) {
+        return Lists.transform(getAllLogSegments(namespace, streams),
+            new Function<LogSegmentBundle, BoundedSource<R>>() {
+              @Override
+              public BoundedSource<R> apply(@Nullable LogSegmentBundle bundle) 
{
+                return new DLBoundedSource<>(
+                    dlUri,
+                    confUri,
+                    streams,
+                    rCoder,
+                    bundle);
+              }
+            });
+      } else {
+        return ImmutableList.of(this);
+      }
+    } finally {
+      namespace.close();
+    }
+  }
+
+  long getLength(LogSegmentMetadata metadata) {
+    return Math.max(0L, metadata.getLastEntryId());
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+    if (null != segment) {
+      return getLength(segment.getMetadata());
+    }
+    long size = 0;
+    DistributedLogNamespace namespace = setupNamespace();
+    try {
+      List<LogSegmentBundle> segments = getAllLogSegments(namespace, streams);
+      for (LogSegmentBundle segment : segments) {
+        size += getLength(segment.getMetadata());
+      }
+      return size;
+    } finally {
+      namespace.close();
+    }
+  }
+
+  @Override
+  public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+    return false;
+  }
+
+  @Override
+  public BoundedReader<R> createReader(PipelineOptions options) throws 
IOException {
+    this.validate();
+    return new MultipleSegmentsReader<>(this, segment);
+  }
+
+  @Override
+  public Coder<R> getDefaultOutputCoder() {
+    return rCoder;
+  }
+}
diff --git 
a/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/DistributedLogIO.java
 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/DistributedLogIO.java
new file mode 100644
index 00000000000..a5cad48648e
--- /dev/null
+++ 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/DistributedLogIO.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.distributedlog;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+import java.net.URI;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+
+/**
+ * The unbounded/bounded source and the sink for <a 
href="http://distributedlog.io";>DistributedLog</a> streams.
+ */
+public class DistributedLogIO {
+
+  /**
+   * Create an uninitialized {@link org.apache.beam.sdk.io.UnboundedSource} 
builder for distributedlog streams.
+   * Before use, basic DistributedLog configuration should set with {@link 
BoundedSourceBuilder#withNamespace(URI)}
+   * and {@link BoundedSourceBuilder#withStreams(List)}.
+   *
+   * @return an uninitialized {@link org.apache.beam.sdk.io.UnboundedSource} 
builder for distributedlog streams.
+   */
+  public static BoundedSourceBuilder<byte[]> unboundedSource() {
+    return new BoundedSourceBuilder<>(
+        URI.create("distributedlog://127.0.0.1/messaging/distributedlog"),
+        Optional.<URI>absent(),
+        Lists.<String>newArrayList(),
+        ByteArrayCoder.of(),
+        null);
+  }
+
+  public static class BoundedSourceBuilder<R> {
+
+    protected final URI dlUri;
+    protected final Optional<URI> confUri;
+    protected final List<String> streams;
+    protected final Coder<R> rCoder;
+    protected final @Nullable LogSegmentBundle segment;
+
+    private BoundedSourceBuilder(
+        URI dlUri,
+        Optional<URI> confUri,
+        List<String> streams,
+        Coder<R> rCoder,
+        LogSegmentBundle segment) {
+      this.dlUri = dlUri;
+      this.confUri = confUri;
+      this.streams = streams;
+      this.rCoder = rCoder;
+      this.segment = segment;
+    }
+
+    /**
+     * Returns a new {@link BoundedSourceBuilder} pointing to namespace {@code 
dlUri}.
+     *
+     * @param dlUri uri of the new distributedlog namespace
+     * @return a new {@link BoundedSourceBuilder} pointing to namespace {@code 
dlUri}.
+     */
+    public BoundedSourceBuilder<R> withNamespace(URI dlUri) {
+      return new BoundedSourceBuilder<>(dlUri, confUri, streams, rCoder, 
segment);
+    }
+
+    /**
+     * Returns a new {@link BoundedSourceBuilder} pointing to new 
configuration {@code confUri}.
+     *
+     * @param confUri uri of the distributedlog configuration file
+     * @return a new {@link BoundedSourceBuilder} pointing to new 
configuration {@code confUri}.
+     */
+    public BoundedSourceBuilder<R> withConfiguration(URI confUri) {
+      return new BoundedSourceBuilder<>(dlUri, Optional.fromNullable(confUri), 
streams, rCoder, segment);
+    }
+
+    /**
+     * Returns a new {@link BoundedSourceBuilder} pointing to new list of 
{@code streams}.
+     *
+     * @param streams new list of distributedlog streams
+     * @return a new {@link BoundedSourceBuilder} pointing to new list of 
{@code streams}.
+     */
+    public BoundedSourceBuilder<R> withStreams(List<String> streams) {
+      return new BoundedSourceBuilder<>(dlUri, confUri, streams, rCoder, 
segment);
+    }
+
+    /**
+     * Returns a new {@link BoundedSourceBuilder} using new record {@code 
coder}.
+     *
+     * @param coder record coder
+     * @return a new {@link BoundedSourceBuilder} using new record {@code 
coder}.
+     */
+    public <RT> BoundedSourceBuilder<RT> withRecordCoder(Coder<RT> coder) {
+      return new BoundedSourceBuilder<>(dlUri, confUri, streams, coder, 
segment);
+    }
+
+    /**
+     * Build a initialized {@link BoundedSource} for distributedlog streams.
+     *
+     * @return the initialized {@link BoundedSource} for distributedlog 
streams.
+     */
+    public BoundedSource<R> build() {
+      return new DLBoundedSource<>(
+          dlUri,
+          confUri,
+          streams,
+          rCoder,
+          segment);
+    }
+
+  }
+
+}
diff --git 
a/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/LogSegmentBundle.java
 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/LogSegmentBundle.java
new file mode 100644
index 00000000000..458f2826e64
--- /dev/null
+++ 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/LogSegmentBundle.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.distributedlog;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.twitter.distributedlog.LogSegmentMetadata;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * A log segment represents a bundle of approximately 
<code>desiredBundleSizeBytes</code>.
+ */
+class LogSegmentBundle implements Externalizable {
+
+  private static final long serialVersionUID = -1924976223810524339L;
+
+  private String streamName;
+  private LogSegmentMetadata metadata;
+
+  public LogSegmentBundle() {}
+
+  public LogSegmentBundle(
+      String streamName,
+      LogSegmentMetadata metadata) {
+    checkNotNull(metadata, "Log Segment is null");
+    this.streamName = streamName;
+    this.metadata = metadata;
+  }
+
+  String getStreamName() {
+    return streamName;
+  }
+
+  LogSegmentMetadata getMetadata() {
+    return metadata;
+  }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    out.writeObject(streamName);
+    out.writeObject(metadata.getFinalisedData());
+  }
+
+  @Override
+  public void readExternal(ObjectInput in)
+      throws IOException, ClassNotFoundException {
+    this.streamName = (String) in.readObject();
+    String metadataStr = (String) in.readObject();
+    this.metadata = LogSegmentMetadata.parseData(streamName, 
metadataStr.getBytes(UTF_8));
+  }
+
+}
diff --git 
a/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/MultipleSegmentsReader.java
 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/MultipleSegmentsReader.java
new file mode 100644
index 00000000000..02191cb673d
--- /dev/null
+++ 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/MultipleSegmentsReader.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.distributedlog;
+
+import com.google.common.collect.ImmutableList;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import java.io.IOException;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+
+/**
+ * A bounded reader reading data from a distributedlog segment.
+ */
+class MultipleSegmentsReader<R> extends BoundedReader<R>{
+
+  private final DLBoundedSource<R> source;
+
+  private List<LogSegmentBundle> segments;
+  private ListIterator<LogSegmentBundle> segmentsIterator;
+  // read state
+  private DistributedLogNamespace namespace;
+  private SingleLogSegmentReader<R> currentReader;
+  private R currentR;
+  private volatile boolean done = false;
+
+  MultipleSegmentsReader(
+      DLBoundedSource<R> source,
+      @Nullable LogSegmentBundle bundle) {
+    this.source = source;
+    if (null != bundle) {
+      this.segments = ImmutableList.of(bundle);
+      this.segmentsIterator = segments.listIterator();
+    }
+  }
+
+  @Override
+  public boolean start() throws IOException {
+    namespace = source.setupNamespace();
+    if (null == segmentsIterator) {
+      this.segments = DLBoundedSource.getAllLogSegments(namespace, 
source.getStreams());
+      this.segmentsIterator = this.segments.listIterator();
+    }
+    return advance();
+  }
+
+  @Override
+  public boolean advance() throws IOException {
+    try {
+      if (null != currentReader && currentReader.advance()) {
+        currentR = currentReader.getCurrentR();
+        return true;
+      }
+      while (segmentsIterator.hasNext()) {
+        // advance the reader and see if it has records
+        LogSegmentBundle nextSegment = segmentsIterator.next();
+        SingleLogSegmentReader<R> reader =
+            new SingleLogSegmentReader<>(namespace, nextSegment, 
source.getDefaultOutputCoder());
+        if (null != currentReader) {
+          currentReader.close();
+        }
+        currentReader = reader;
+        reader.start();
+        if (reader.advance()) {
+          currentR = reader.getCurrentR();
+          return true;
+        }
+        currentReader.close();
+        currentReader = null;
+      }
+      currentR = null;
+      done = true;
+      return false;
+    } catch (DLInterruptedException dlie) {
+      Thread.currentThread().interrupt();
+      throw dlie;
+    }
+  }
+
+  @Override
+  public R getCurrent() throws NoSuchElementException {
+    if (null == currentR) {
+      throw new NoSuchElementException();
+    }
+    return currentR;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (null != currentReader) {
+      currentReader.close();
+    }
+    namespace.close();
+  }
+
+  @Override
+  public BoundedSource<R> getCurrentSource() {
+    return source;
+  }
+
+  @Nullable
+  @Override
+  public Double getFractionConsumed() {
+    if (null == currentReader) {
+      return 0.0;
+    }
+    if (segments.isEmpty()) {
+      return 1.0;
+    }
+    int index = segmentsIterator.previousIndex();
+    int numReaders = segments.size();
+    if (index == numReaders) {
+      return 1.0;
+    }
+    double before = 1.0 * index / numReaders;
+    double after = 1.0 * (index + 1) / numReaders;
+    SingleLogSegmentReader<R> ssr = currentReader;
+    if (null == ssr) {
+      return before;
+    }
+    return before + ssr.getProgress() * (after - before);
+  }
+
+  @Override
+  public long getSplitPointsRemaining() {
+    if (done) {
+      return 0;
+    }
+    // The current implementation does not currently support dynamic work
+    // rebalancing.
+    // TODO: will improve this later.
+    return 1;
+  }
+
+  @Nullable
+  @Override
+  public BoundedSource<R> splitAtFraction(double fraction) {
+    // The current implementation does not currently support this feature.
+    // TODO: will improve this later.
+    return null;
+  }
+}
diff --git 
a/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/SingleLogSegmentReader.java
 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/SingleLogSegmentReader.java
new file mode 100644
index 00000000000..1bf258ad9eb
--- /dev/null
+++ 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/SingleLogSegmentReader.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.distributedlog.*;
+import com.twitter.distributedlog.exceptions.LogEmptyException;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.Utils;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
+
+/**
+ * Reader to read records for a single log segment.
+ */
+class SingleLogSegmentReader<R> {
+
+  static final int NUM_RECORDS_PER_BATCH = 100;
+
+  private final DistributedLogNamespace namespace;
+  private final LogSegmentBundle segment;
+
+  // kv coder
+  private final Coder<R> coder;
+
+  // Reader State
+  private DistributedLogManager logStream;
+  private AsyncLogReader reader;
+  private R currentR;
+  private DLSN lastDLSN;
+  private boolean done = false;
+  private int numReadRecords = 0;
+  private int totalRecords = 0;
+  private Iterator<LogRecordWithDLSN> currentRecordBatch;
+
+  SingleLogSegmentReader(DistributedLogNamespace namespace,
+                         LogSegmentBundle segment,
+                         Coder<R> coder) {
+    this.namespace = namespace;
+    this.segment = segment;
+    this.coder = coder;
+  }
+
+  @VisibleForTesting
+  DLSN getLastDLSN() {
+    return lastDLSN;
+  }
+
+  @VisibleForTesting
+  int getTotalRecords() {
+    return totalRecords;
+  }
+
+  @VisibleForTesting
+  AsyncLogReader getReader() {
+    return reader;
+  }
+
+  @VisibleForTesting
+  DistributedLogManager getLogStream() {
+    return logStream;
+  }
+
+  boolean start() throws IOException {
+    doStart();
+    return advance();
+  }
+
+  double getProgress() {
+    if (totalRecords == 0) {
+      return 1.0;
+    }
+    return numReadRecords * 1.0 / totalRecords;
+  }
+
+  private void doStart() throws IOException {
+    logStream = namespace.openLog(segment.getStreamName());
+    if (segment.getMetadata().isInProgress()) {
+      LogRecordWithDLSN lastRecord;
+      try {
+        lastRecord = logStream.getLastLogRecord();
+      } catch (LogEmptyException | LogNotFoundException e) {
+        // no records in the stream
+        currentR = null;
+        done = true;
+        return;
+      }
+      lastDLSN = lastRecord.getDlsn();
+      totalRecords = lastRecord.getPositionWithinLogSegment();
+    } else if (segment.getMetadata().getRecordCount() == 0) {
+      // the log segment is an empty one
+      currentR = null;
+      done = true;
+      return;
+    } else {
+      lastDLSN = segment.getMetadata().getLastDLSN();
+      totalRecords = segment.getMetadata().getRecordCount();
+    }
+    DLSN firstDLSN = segment.getMetadata().getFirstDLSN();
+    reader = FutureUtils.result(logStream.openAsyncLogReader(firstDLSN));
+  }
+
+  boolean advance() throws IOException {
+    if (done) {
+      currentR = null;
+      return false;
+    }
+    LogRecordWithDLSN record = null;
+    while (null == record) {
+      if (null == currentRecordBatch) {
+        currentRecordBatch =
+            
FutureUtils.result(reader.readBulk(NUM_RECORDS_PER_BATCH)).iterator();
+      }
+      if (currentRecordBatch.hasNext()) {
+        record = currentRecordBatch.next();
+      } else {
+        currentRecordBatch = null;
+      }
+    }
+    int diff = record.getDlsn().compareTo(lastDLSN);
+    if (diff < 0) {
+      currentR = getR(record);
+      ++numReadRecords;
+      return true;
+    } else if (diff > 0) {
+      currentR = null;
+      done = true;
+      return false;
+    } else  {
+      currentR = getR(record);
+      done = true;
+      ++numReadRecords;
+      return true;
+    }
+  }
+
+  private R getR(LogRecordWithDLSN record) throws IOException {
+    return coder.decode(getRecordStream(record), Coder.Context.OUTER);
+  }
+
+  private static InputStream getRecordStream(LogRecordWithDLSN record) {
+    return new ExposedByteArrayInputStream(record.getPayload());
+  }
+
+  R getCurrentR() throws NoSuchElementException {
+    return currentR;
+  }
+
+  void close() throws IOException {
+    if (null != reader) {
+      Utils.close(reader);
+    }
+    if (null != logStream) {
+      logStream.close();
+    }
+  }
+
+}
diff --git 
a/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/package-info.java
 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/package-info.java
new file mode 100644
index 00000000000..49eb0a9463b
--- /dev/null
+++ 
b/sdks/java/io/distributedlog/src/main/java/org/apache/beam/sdk/io/distributedlog/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Transforms for reading and writing from Apache DistributedLog Streams.
+ */
+package org.apache.beam.sdk.io.distributedlog;
diff --git 
a/sdks/java/io/distributedlog/src/test/java/com/twitter/distributedlog/LogSegmentUtils.java
 
b/sdks/java/io/distributedlog/src/test/java/com/twitter/distributedlog/LogSegmentUtils.java
new file mode 100644
index 00000000000..fa955f6b411
--- /dev/null
+++ 
b/sdks/java/io/distributedlog/src/test/java/com/twitter/distributedlog/LogSegmentUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog;
+
+import com.twitter.distributedlog.LogSegmentMetadata.LogSegmentMetadataBuilder;
+import com.twitter.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion;
+
+/**
+ * Utils for log segment metadata
+ */
+public class LogSegmentUtils {
+
+  public static LogSegmentMetadataBuilder newBuilder(
+      String path,
+      LogSegmentMetadataVersion version,
+      long logSegmentId,
+      long firstTxId) {
+    return new LogSegmentMetadataBuilder(path, version, logSegmentId, 
firstTxId);
+  }
+
+  public static LogSegmentMetadata.Mutator newMutator(LogSegmentMetadata 
segment) {
+    return new LogSegmentMetadata.Mutator(segment);
+  }
+
+}
diff --git 
a/sdks/java/io/distributedlog/src/test/java/org/apache/beam/sdk/io/distributedlog/LogSegmentBundleTest.java
 
b/sdks/java/io/distributedlog/src/test/java/org/apache/beam/sdk/io/distributedlog/LogSegmentBundleTest.java
new file mode 100644
index 00000000000..54e0559d740
--- /dev/null
+++ 
b/sdks/java/io/distributedlog/src/test/java/org/apache/beam/sdk/io/distributedlog/LogSegmentBundleTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.distributedlog;
+
+import static org.junit.Assert.assertEquals;
+
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion;
+import com.twitter.distributedlog.LogSegmentUtils;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test case for {@link LogSegmentBundle}
+ */
+@RunWith(JUnit4.class)
+public class LogSegmentBundleTest {
+
+  @Test(timeout = 60000)
+  public void testExternalizable() throws Exception {
+    // Create a log segment metadata
+    LogSegmentMetadata segment = LogSegmentUtils.newBuilder(
+        "test-stream",
+        LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID,
+        1234L,
+        5678L)
+        .setInprogress(false)
+        .setLastEntryId(89L)
+        .setLastTxId(89123L)
+        .setRecordCount(10000)
+        .setCompletionTime(System.currentTimeMillis())
+        .build();
+    LogSegmentBundle bundle = new LogSegmentBundle("test-stream", segment);
+    // write the segment bundle
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+    ObjectOutput oo = new ObjectOutputStream(new DataOutputStream(baos));
+    bundle.writeExternal(oo);
+    // read the segment bundle
+    byte[] data = baos.toByteArray();
+    ByteArrayInputStream bais = new ByteArrayInputStream(data);
+    ObjectInput in = new ObjectInputStream(new DataInputStream(bais));
+    LogSegmentBundle newBundle = new LogSegmentBundle();
+    newBundle.readExternal(in);
+    // verify bundles
+    assertEquals("expected stream name " + bundle.getStreamName() + " but " + 
newBundle.getStreamName() + " found",
+        bundle.getStreamName(), newBundle.getStreamName());
+    assertEquals("expected log segment " + bundle.getMetadata() + " but " + 
newBundle.getMetadata() + " found",
+        bundle.getMetadata(), newBundle.getMetadata());
+  }
+
+}
diff --git 
a/sdks/java/io/distributedlog/src/test/java/org/apache/beam/sdk/io/distributedlog/SingleLogSegmentReaderTest.java
 
b/sdks/java/io/distributedlog/src/test/java/org/apache/beam/sdk/io/distributedlog/SingleLogSegmentReaderTest.java
new file mode 100644
index 00000000000..e7a591cac72
--- /dev/null
+++ 
b/sdks/java/io/distributedlog/src/test/java/org/apache/beam/sdk/io/distributedlog/SingleLogSegmentReaderTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.distributedlog;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static 
org.apache.beam.sdk.io.distributedlog.SingleLogSegmentReader.NUM_RECORDS_PER_BATCH;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.AsyncLogReader;
+import com.twitter.distributedlog.DLSN;
+import com.twitter.distributedlog.DistributedLogManager;
+import com.twitter.distributedlog.LogRecordWithDLSN;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion;
+import com.twitter.distributedlog.LogSegmentUtils;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Future;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link SingleLogSegmentReader}
+ */
+@RunWith(JUnit4.class)
+public class SingleLogSegmentReaderTest {
+
+  @Test(timeout = 60000)
+  public void testReadEmptyLogSegment() throws Exception {
+    // Create an empty log segment metadata
+    LogSegmentMetadata segment = LogSegmentUtils.newBuilder(
+        "test-stream",
+        LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID,
+        1234L,
+        5678L)
+        .setInprogress(false)
+        .setLastEntryId(89L)
+        .setLastTxId(89123L)
+        .setRecordCount(0)
+        .setCompletionTime(System.currentTimeMillis())
+        .build();
+    LogSegmentBundle bundle = new LogSegmentBundle("test-stream", segment);
+    Coder<byte[]> coder = ByteArrayCoder.of();
+
+    DistributedLogNamespace mockNamespace = 
mock(DistributedLogNamespace.class);
+    SingleLogSegmentReader<byte[]> reader =
+        new SingleLogSegmentReader(mockNamespace, bundle, coder);
+    assertFalse("There is no records to read", reader.start());
+    assertNull("Reader should be null", reader.getReader());
+    assertNull("Log manager should be null", reader.getLogStream());
+    assertEquals("There is no records to read", 0L, reader.getTotalRecords());
+    assertNull("Last DLSN should be null", reader.getLastDLSN());
+    assertEquals(1.0f, reader.getProgress(), 0.0);
+    assertNull("There is no records to read", reader.getCurrentR());
+    reader.close();
+  }
+
+  private List<LogRecordWithDLSN> generateRecord(long lssn, long entryId) {
+    LogRecordWithDLSN record = new LogRecordWithDLSN(
+        new DLSN(lssn, entryId, 0L),
+        entryId,
+        ("record-" + entryId).getBytes(UTF_8),
+        0L);
+    return Lists.newArrayList(record);
+  }
+
+  @Test(timeout = 60000)
+  public void testReadClosedLogSegment() throws Exception {
+    // Create an closed log segment metadata
+    LogSegmentMetadata segment = LogSegmentUtils.newBuilder(
+        "test-stream",
+        LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID,
+        1234L,
+        5678L)
+        .setInprogress(false)
+        .setLastEntryId(89L)
+        .setLastTxId(89123L)
+        .setRecordCount(5)
+        .setCompletionTime(System.currentTimeMillis())
+        .build();
+    DLSN lastDLSN = new DLSN(3L, 4L, 0L);
+    segment = LogSegmentUtils.newMutator(segment)
+        .setLastDLSN(lastDLSN)
+        .build();
+    LogSegmentBundle bundle = new LogSegmentBundle("test-stream", segment);
+    Coder<byte[]> coder = ByteArrayCoder.of();
+
+    // create a mock reader
+    AsyncLogReader mockReader = mock(AsyncLogReader.class);
+    when(mockReader.readBulk(NUM_RECORDS_PER_BATCH))
+        .thenReturn(Future.value(generateRecord(3L, 0L)))
+        .thenReturn(Future.value(generateRecord(3L, 1L)))
+        .thenReturn(Future.value(generateRecord(3L, 2L)))
+        .thenReturn(Future.value(generateRecord(3L, 3L)))
+        .thenReturn(Future.value(generateRecord(3L, 4L)));
+    when(mockReader.asyncClose())
+        .thenReturn(Future.Void());
+    // create a mock log manager
+    DistributedLogManager mockLogManager = mock(DistributedLogManager.class);
+    when(mockLogManager.openAsyncLogReader(new DLSN(3L, 0L, 0L)))
+        .thenReturn(Future.value(mockReader));
+    // create a mock namespace
+    DistributedLogNamespace mockNamespace = 
mock(DistributedLogNamespace.class);
+    when(mockNamespace.openLog("test-stream"))
+        .thenReturn(mockLogManager);
+
+    // open a single log segment reader
+    SingleLogSegmentReader<byte[]> reader =
+        new SingleLogSegmentReader(mockNamespace, bundle, coder);
+    assertTrue("There will be records to read", reader.start());
+    assertTrue("Reader should not be null", mockReader == reader.getReader());
+    assertTrue("Log manager should not be null", mockLogManager == 
reader.getLogStream());
+    assertEquals("There is 5 records to read", 5L, reader.getTotalRecords());
+    assertEquals("Last DLSN should be " + lastDLSN, lastDLSN, 
reader.getLastDLSN());
+    assertEquals(0.2f, reader.getProgress(), 0.00001f);
+    assertNotNull("There will be records to read", reader.getCurrentR());
+
+    int entryId = 0;
+    byte[] record = reader.getCurrentR();
+    while (null != record) {
+      byte[] expetedRecord = ("record-" + entryId).getBytes(UTF_8);
+      assertArrayEquals(expetedRecord, record);
+      ++entryId;
+      if (entryId < 5) {
+        assertTrue("There will be more records to read", reader.advance());
+        record = reader.getCurrentR();
+      } else if (entryId == 5) {
+        assertFalse("There will be no more records to read", reader.advance());
+        assertNull("There will be no more records to read", 
reader.getCurrentR());
+        break;
+      }
+    }
+    assertEquals(5, entryId);
+    assertEquals(1.0, reader.getProgress(), 0.00001);
+    reader.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testReadInprogressLogSegment() throws Exception {
+    // Create an inprogress log segment metadata
+    LogSegmentMetadata segment = LogSegmentUtils.newBuilder(
+        "test-stream",
+        LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID,
+        1234L,
+        5678L)
+        .setInprogress(true)
+        .build();
+    DLSN lastDLSN = new DLSN(3L, 4L, 0L);
+    List<LogRecordWithDLSN> lastRecords = generateRecord(3L, 4L);
+    segment = LogSegmentUtils.newMutator(segment)
+        .setLastDLSN(new DLSN(3L, -1L, -1L))
+        .build();
+    LogSegmentBundle bundle = new LogSegmentBundle("test-stream", segment);
+    Coder<byte[]> coder = ByteArrayCoder.of();
+
+    // create a mock reader
+    AsyncLogReader mockReader = mock(AsyncLogReader.class);
+    when(mockReader.readBulk(NUM_RECORDS_PER_BATCH))
+        .thenReturn(Future.value(generateRecord(3L, 0L)))
+        .thenReturn(Future.value(generateRecord(3L, 1L)))
+        .thenReturn(Future.value(generateRecord(3L, 2L)))
+        .thenReturn(Future.value(generateRecord(3L, 3L)))
+        .thenReturn(Future.value(lastRecords));
+    when(mockReader.asyncClose())
+        .thenReturn(Future.Void());
+    // create a mock log manager
+    DistributedLogManager mockLogManager = mock(DistributedLogManager.class);
+    when(mockLogManager.openAsyncLogReader(new DLSN(3L, 0L, 0L)))
+        .thenReturn(Future.value(mockReader));
+    when(mockLogManager.getLastLogRecord())
+        .thenReturn(lastRecords.get(0));
+    // create a mock namespace
+    DistributedLogNamespace mockNamespace = 
mock(DistributedLogNamespace.class);
+    when(mockNamespace.openLog("test-stream"))
+        .thenReturn(mockLogManager);
+
+    // open a single log segment reader
+    SingleLogSegmentReader<byte[]> reader =
+        new SingleLogSegmentReader(mockNamespace, bundle, coder);
+    assertTrue("There will be records to read", reader.start());
+    assertTrue("Reader should not be null", mockReader == reader.getReader());
+    assertTrue("Log manager should not be null", mockLogManager == 
reader.getLogStream());
+    assertEquals("Last DLSN should be " + lastDLSN, lastDLSN, 
reader.getLastDLSN());
+    assertNotNull("There will be records to read", reader.getCurrentR());
+
+    int entryId = 0;
+    byte[] record = reader.getCurrentR();
+    while (null != record) {
+      byte[] expetedRecord = ("record-" + entryId).getBytes(UTF_8);
+      assertArrayEquals(expetedRecord, record);
+      ++entryId;
+      if (entryId < 5) {
+        assertTrue("There will be more records to read", reader.advance());
+        record = reader.getCurrentR();
+      } else if (entryId == 5) {
+        assertFalse("There will be no more records to read", reader.advance());
+        assertNull("There will be no more records to read", 
reader.getCurrentR());
+        break;
+      }
+    }
+    assertEquals(5, entryId);
+    reader.close();
+  }
+
+}
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 3750202418b..e5399695b5e 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -33,13 +33,14 @@
   (sources and sinks) to consume and produce data from systems.</description>
 
   <modules>
+    <module>distributedlog</module>
     <module>google-cloud-platform</module>
     <module>hdfs</module>
+    <module>jdbc</module>
     <module>jms</module>
     <module>kafka</module>
     <module>kinesis</module>
     <module>mongodb</module>
-    <module>jdbc</module>
   </modules>
 
 </project>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add DistributedLog IO
> ---------------------
>
>                 Key: BEAM-607
>                 URL: https://issues.apache.org/jira/browse/BEAM-607
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-extensions
>            Reporter: Khurrum Nasim
>
> I'd like to add an IO for the new DistributedLog streams - 
> http://distributedlog.io
> - bounded source and sink (sealed streams)
> - unbounded source and sink (unsealed streams)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to