[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-17 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1265653816


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,49 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}

Review Comment:
   Sorry! Just had one more thought about this case.
   
   If the connector has been reconfigured from using a file to using stdin, 
doesn't it still make sense to allow users to interact with offsets for the 
file that it was using in the past? This is especially relevant for the [total 
reset case](https://github.com/apache/kafka/pull/13945#discussion_r1257110479) 
that you noted in the earlier discussion with Greg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-13 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1262708616


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
    Sounds good, thanks for thinking this through.



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
    Sounds good, thanks for thinking this through.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261646923


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Hmmm... wouldn't that be a pretty serious breaking change if we accidentally 
switched up how the JSON converter deserializes integer types? Not just for the 
file source connector, but for plenty of others.
   
   It feels like it might be a better use of our time to make note of this 
possibility and ensure that we have sufficient unit testing in place to prevent 
that kind of regression (I suspect we already do but haven't verified this yet).
   
   Of course, because things aren't interesting enough already--it turns out 
that there's actually two different scenarios in which tasks observe offsets 
for their connector. The first, which we're all familiar with, is when they 
query them using an 
[OffsetStorageReader](https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java),
 which in distributed mode reflects the contents of the offsets topic. The 
second is when 
[SourceTask::commitRecord](https://kafka.apache.org/35/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata))
 is invoked, which carries with it the just-ack'd `SourceRecord` instance 
originally provided by the task, including the original in-memory source 
partition and source offset, which may use types that get lost when written to 
and read back from the offsets topic.
   
   I don't know if this significantly changes the conversation but it seems 
subtle and counterintuitive enough to bring up so that we can avoid 
accidentally breaking connector code that relies on this behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261646923


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Hmmm... wouldn't that be a pretty serious breaking change if we accidentally 
switched up how the JSON converter deserializes integer types? Not just for the 
file source connector, but for plenty of others.
   
   It feels like it might be a better use of our time to make note of this 
possibility and ensure that we have sufficient unit testing in place to prevent 
that kind of regression (I suspect we already do but haven't verified this yet).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261413255


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Ah, nice catch! I noticed the discrepancy in numeric types while working on 
[KAFKA-15177](https://issues.apache.org/jira/browse/KAFKA-15177) but hadn't 
even considered the possibility of aligning the types across invocations of 
`alterOffsets` and `OffsetStorageReader::offset`/`OffsetStorageReader::offsets`.
   
   I think re-deserializing the offsets before passing them to `alterOffsets` 
is a great idea. Unless the request body is gigantic there shouldn't be serious 
performance concerns, and it also acts as a nice preflight check to ensure that 
the offsets can be successfully propagated to the connector's tasks through the 
offsets topic.
   
   I still don't love permitting string types for the connector's `position` 
offset values--it doesn't seem like a great endorsement of our API if we have 
to implement workarounds in the file connectors, which are the first example of 
the connector API that many developers see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259882152


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   I'd prefer to leave the task parsing the same; less work on our part, and 
less risk of a regression in existing parts of the code base.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259782359


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;

Review Comment:
   Shouldn't this be `continue`?



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked

Review Comment:
   We can remove this comment now, right?



##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##
@@ -147,4 +152,102 @@ public void testInvalidBatchSize() {
 sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
 assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
 }
+
+@Test
+public void testAlterOffsetsStdin() {
+sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectPartitionKey() {
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+Collections.singletonMap("other_partition_key", FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+
+// null partitions are invalid
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+null,
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+}
+
+@Test
+public void testAlterOffsetsMultiplePartitions() {
+Map, Map> offsets = new HashMap<>();
+offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+assertTrue(connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectOffsetKey() {
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap("other_offset_key", 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsOffsetPositionValues() {

Review Comment:
   Maybe 

[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-10 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1258640264


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");

Review Comment:
   Should we tell users that this is because stdin will be used and we don't do 
any offset tracking in that case?



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset != null && !offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+}
+
+// Let the task validate the actual value for the offset position on 
startup

Review Comment:
   Any reason not to do this preemptively? We could at least validate that the 
value for the "position" key is non-null, is a numeric type, and is 
non-negative.



##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##
@@ -147,4 +151,59 @@ public void testInvalidBatchSize() {
 sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
 assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
 }
+
+@Test
+public void testAlterOffsetsStdin() {
+sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectPartitionKey() {
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+Collections.singletonMap("invalid_partition_key", FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+
+// null partitions are invalid
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+null,
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+}
+
+@Test
+public void testAlterOffsetsMultiplePartitions() {
+Map, Map> offsets = new HashMap<>();
+offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+connector.alterOffsets(sourceProperties, offsets);

Review