wj1918 commented on a change in pull request #8612: URL: https://github.com/apache/kafka/pull/8612#discussion_r428622562
########## File path: connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java ########## @@ -77,91 +68,34 @@ public void start(Map<String, String> props) { @Override public List<SourceRecord> poll() throws InterruptedException { - if (stream == null) { - try { - stream = Files.newInputStream(Paths.get(filename)); - Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename)); - if (offset != null) { - Object lastRecordedOffset = offset.get(POSITION_FIELD); - if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long)) - throw new ConnectException("Offset position is the incorrect type"); - if (lastRecordedOffset != null) { - log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset); - long skipLeft = (Long) lastRecordedOffset; - while (skipLeft > 0) { - try { - long skipped = stream.skip(skipLeft); - skipLeft -= skipped; - } catch (IOException e) { - log.error("Error while trying to seek to previous offset in file {}: ", filename, e); - throw new ConnectException(e); - } - } - log.debug("Skipped to offset {}", lastRecordedOffset); - } - streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L; - } else { - streamOffset = 0L; - } - reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)); - log.debug("Opened {} for reading", logFilename()); - } catch (NoSuchFileException e) { - log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename()); - synchronized (this) { - this.wait(1000); - } - return null; - } catch (IOException e) { - log.error("Error while trying to open file {}: ", filename, e); - throw new ConnectException(e); - } - } + if (!fileStreamBuffer.ensureOpen(() -> context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename)))) + return null; // Unfortunately we can't just use readLine() because it blocks in an uninterruptible way. // Instead we have to manage splitting lines ourselves, using simple backoff when no new data // is available. try { - final BufferedReader readerCopy; Review comment: remove copy of reader. poll is single threaded for this type of task. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org