wj1918 commented on a change in pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#discussion_r428622562



##########
File path: 
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
##########
@@ -77,91 +68,34 @@ public void start(Map<String, String> props) {
 
     @Override
     public List<SourceRecord> poll() throws InterruptedException {
-        if (stream == null) {
-            try {
-                stream = Files.newInputStream(Paths.get(filename));
-                Map<String, Object> offset = 
context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, 
filename));
-                if (offset != null) {
-                    Object lastRecordedOffset = offset.get(POSITION_FIELD);
-                    if (lastRecordedOffset != null && !(lastRecordedOffset 
instanceof Long))
-                        throw new ConnectException("Offset position is the 
incorrect type");
-                    if (lastRecordedOffset != null) {
-                        log.debug("Found previous offset, trying to skip to 
file offset {}", lastRecordedOffset);
-                        long skipLeft = (Long) lastRecordedOffset;
-                        while (skipLeft > 0) {
-                            try {
-                                long skipped = stream.skip(skipLeft);
-                                skipLeft -= skipped;
-                            } catch (IOException e) {
-                                log.error("Error while trying to seek to 
previous offset in file {}: ", filename, e);
-                                throw new ConnectException(e);
-                            }
-                        }
-                        log.debug("Skipped to offset {}", lastRecordedOffset);
-                    }
-                    streamOffset = (lastRecordedOffset != null) ? (Long) 
lastRecordedOffset : 0L;
-                } else {
-                    streamOffset = 0L;
-                }
-                reader = new BufferedReader(new InputStreamReader(stream, 
StandardCharsets.UTF_8));
-                log.debug("Opened {} for reading", logFilename());
-            } catch (NoSuchFileException e) {
-                log.warn("Couldn't find file {} for FileStreamSourceTask, 
sleeping to wait for it to be created", logFilename());
-                synchronized (this) {
-                    this.wait(1000);
-                }
-                return null;
-            } catch (IOException e) {
-                log.error("Error while trying to open file {}: ", filename, e);
-                throw new ConnectException(e);
-            }
-        }
+        if (!fileStreamBuffer.ensureOpen(() -> 
context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, 
filename))))
+            return null;
 
         // Unfortunately we can't just use readLine() because it blocks in an 
uninterruptible way.
         // Instead we have to manage splitting lines ourselves, using simple 
backoff when no new data
         // is available.
         try {
-            final BufferedReader readerCopy;

Review comment:
       remove copy of reader. poll is single threaded for this type of task. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to