Re: outerjoin not joining after window
Can someone confirm that each partition has its own stream time and that the stream time for a partition only advances when a record is written to the partition after the window closes? That's correct. On 5/21/24 10:11 AM, Chad Preisler wrote: After reviewing the logs, I think I understand what happens with the repartition topics. Looks like they will be assigned to one or more instances. In my example I ran three instances of the application (A, B, C). Looks like the two repartition topics got assigned to A and B. The six partitions from the input topics got split evenly across all three running instances A, B, and C. Since the repartitioned streams are what I'm joining on, I guess the join will run on two instances, and any input topic processing will run across all three. Is that correct? Still would like clarification regarding some records appearing to not get processed: I think the issue is related to certain partitions not getting records to advance stream time (because of low volume). Can someone confirm that each partition has its own stream time and that the stream time for a partition only advances when a record is written to the partition after the window closes? On Tue, May 21, 2024 at 10:27 AM Chad Preisler wrote: See one small edit below... On Tue, May 21, 2024 at 10:25 AM Chad Preisler wrote: Hello, I think the issue is related to certain partitions not getting records to advance stream time (because of low volume). Can someone confirm that each partition has its own stream time and that the stream time for a partition only advances when a record is written to the partition after the window closes? If I use the repartition method on each input topic to reduce the number of partitions for those streams, how many instances of the application will process records? For example, if the input topics each have 6 partitions, and I use the repartition method to set the number of partitions for the streams to 2, how many instances of the application will process records? Thanks, Chad On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax wrote: How do you know this? First thing we do is write a log message in the value joiner. We don't see the log message for the missed records. Well, for left/right join results, the ValueJoiner would only be called when the window is closed... And for invalid input (or late record, ie, which arrive out-of-order and their window was already closes), records would be dropped right away. So you cannot really infer that a record did make it into the join or not, or what happens if it did make it into the `Processor`. -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring `dropped-records-total` is the name of the metric. -Matthias On 5/1/24 11:35 AM, Chad Preisler wrote: Hello, We did some testing in our test environment today. We are seeing some records processes where only one side of the join has a record. So that's good. However, we are still seeing some records get skipped. They never hit the value joiner (we write a log message first thing in the value joiner). During the test we were putting some load on the system, so stream time was advancing. We did notice that the join windows were taking much longer than 30 minutes to close and process records. Thirty minutes is the window plus grace. How do you know this? First thing we do is write a log message in the value joiner. We don't see the log message for the missed records. I will try pushing the same records locally. However, we don't see any errors in our logs and the stream does process one sided joins after the skipped record. Do you have any docs on the "dropper records" metric? I did a Google search and didn't find many good results for that. Thanks, Chad On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax wrote: Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. That gives some hope :) However, they never get into the join. How do you know this? Did you check the metric for dropper records? Maybe records are considers malformed and dropped? Are you using the same records in production and in your local test? Are there any settings for the stream client that would affect the join? Not that I can think of... There is one more internal config, but as long as data is flowing, it should not impact the result you see. Are there any settings on the broker side that would affect the join? No. The join is computed client side. Broker configs should not have any impact. f I increase the log level for the streams API would that shed some light on what is happening? I don't think it would help much. The code in question is org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it does not do any logging except WARN for the already mentioned "dropping
Re: outerjoin not joining after window
After reviewing the logs, I think I understand what happens with the repartition topics. Looks like they will be assigned to one or more instances. In my example I ran three instances of the application (A, B, C). Looks like the two repartition topics got assigned to A and B. The six partitions from the input topics got split evenly across all three running instances A, B, and C. Since the repartitioned streams are what I'm joining on, I guess the join will run on two instances, and any input topic processing will run across all three. Is that correct? Still would like clarification regarding some records appearing to not get processed: I think the issue is related to certain partitions not getting records to advance stream time (because of low volume). Can someone confirm that each partition has its own stream time and that the stream time for a partition only advances when a record is written to the partition after the window closes? On Tue, May 21, 2024 at 10:27 AM Chad Preisler wrote: > See one small edit below... > > On Tue, May 21, 2024 at 10:25 AM Chad Preisler > wrote: > >> Hello, >> >> I think the issue is related to certain partitions not getting records to >> advance stream time (because of low volume). Can someone confirm that each >> partition has its own stream time and that the stream time for a partition >> only advances when a record is written to the partition after the window >> closes? >> >> If I use the repartition method on each input topic to reduce the number >> of partitions for those streams, how many instances of the application will >> process records? For example, if the input topics each have 6 partitions, >> and I use the repartition method to set the number of partitions for the >> streams to 2, how many instances of the application will process records? >> >> Thanks, >> Chad >> >> >> On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax wrote: >> >>> >>> How do you know this? >>> >> First thing we do is write a log message in the value joiner. We >>> don't see >>> >> the log message for the missed records. >>> >>> Well, for left/right join results, the ValueJoiner would only be called >>> when the window is closed... And for invalid input (or late record, ie, >>> which arrive out-of-order and their window was already closes), records >>> would be dropped right away. So you cannot really infer that a record >>> did make it into the join or not, or what happens if it did make it into >>> the `Processor`. >>> >>> -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring >>> >>> `dropped-records-total` is the name of the metric. >>> >>> >>> >>> -Matthias >>> >>> >>> >>> On 5/1/24 11:35 AM, Chad Preisler wrote: >>> > Hello, >>> > >>> > We did some testing in our test environment today. We are seeing some >>> > records processes where only one side of the join has a record. So >>> that's >>> > good. However, we are still seeing some records get skipped. They >>> never hit >>> > the value joiner (we write a log message first thing in the value >>> joiner). >>> > During the test we were putting some load on the system, so stream >>> time was >>> > advancing. We did notice that the join windows were taking much longer >>> than >>> > 30 minutes to close and process records. Thirty minutes is the window >>> plus >>> > grace. >>> > >>> >> How do you know this? >>> > First thing we do is write a log message in the value joiner. We don't >>> see >>> > the log message for the missed records. >>> > >>> > I will try pushing the same records locally. However, we don't see any >>> > errors in our logs and the stream does process one sided joins after >>> the >>> > skipped record. Do you have any docs on the "dropper records" metric? >>> I did >>> > a Google search and didn't find many good results for that. >>> > >>> > Thanks, >>> > >>> > Chad >>> > >>> > On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax >>> wrote: >>> > >>> Thanks for the information. I ran the code using Kafka locally. >>> After >>> submitting some records inside and outside of the time window and >>> grace, >>> the join performed as expected when running locally. >>> >> >>> >> That gives some hope :) >>> >> >>> >> >>> >> >>> >>> However, they never get into the join. >>> >> >>> >> How do you know this? >>> >> >>> >> >>> >> Did you check the metric for dropper records? Maybe records are >>> >> considers malformed and dropped? Are you using the same records in >>> >> production and in your local test? >>> >> >>> >> >>> Are there any settings for the stream client that would affect the >>> join? >>> >> >>> >> Not that I can think of... There is one more internal config, but as >>> >> long as data is flowing, it should not impact the result you see. >>> >> >>> >> >>> Are there any settings on the broker side that would affect the >>> join? >>> >> >>> >> No. The join is computed client side. Broker configs should not have >>> any >>> >> impact. >>> >> >>> >>> f I increase the log level for
Re: outerjoin not joining after window
See one small edit below... On Tue, May 21, 2024 at 10:25 AM Chad Preisler wrote: > Hello, > > I think the issue is related to certain partitions not getting records to > advance stream time (because of low volume). Can someone confirm that each > partition has its own stream time and that the stream time for a partition > only advances when a record is written to the partition after the window > closes? > > If I use the repartition method on each input topic to reduce the number > of partitions for those streams, how many instances of the application will > process records? For example, if the input topics each have 6 partitions, > and I use the repartition method to set the number of partitions for the > streams to 2, how many instances of the application will process records? > > Thanks, > Chad > > > On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax wrote: > >> >>> How do you know this? >> >> First thing we do is write a log message in the value joiner. We don't >> see >> >> the log message for the missed records. >> >> Well, for left/right join results, the ValueJoiner would only be called >> when the window is closed... And for invalid input (or late record, ie, >> which arrive out-of-order and their window was already closes), records >> would be dropped right away. So you cannot really infer that a record >> did make it into the join or not, or what happens if it did make it into >> the `Processor`. >> >> -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring >> >> `dropped-records-total` is the name of the metric. >> >> >> >> -Matthias >> >> >> >> On 5/1/24 11:35 AM, Chad Preisler wrote: >> > Hello, >> > >> > We did some testing in our test environment today. We are seeing some >> > records processes where only one side of the join has a record. So >> that's >> > good. However, we are still seeing some records get skipped. They never >> hit >> > the value joiner (we write a log message first thing in the value >> joiner). >> > During the test we were putting some load on the system, so stream time >> was >> > advancing. We did notice that the join windows were taking much longer >> than >> > 30 minutes to close and process records. Thirty minutes is the window >> plus >> > grace. >> > >> >> How do you know this? >> > First thing we do is write a log message in the value joiner. We don't >> see >> > the log message for the missed records. >> > >> > I will try pushing the same records locally. However, we don't see any >> > errors in our logs and the stream does process one sided joins after the >> > skipped record. Do you have any docs on the "dropper records" metric? I >> did >> > a Google search and didn't find many good results for that. >> > >> > Thanks, >> > >> > Chad >> > >> > On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax >> wrote: >> > >> Thanks for the information. I ran the code using Kafka locally. After >> submitting some records inside and outside of the time window and >> grace, >> the join performed as expected when running locally. >> >> >> >> That gives some hope :) >> >> >> >> >> >> >> >>> However, they never get into the join. >> >> >> >> How do you know this? >> >> >> >> >> >> Did you check the metric for dropper records? Maybe records are >> >> considers malformed and dropped? Are you using the same records in >> >> production and in your local test? >> >> >> >> >> Are there any settings for the stream client that would affect the >> join? >> >> >> >> Not that I can think of... There is one more internal config, but as >> >> long as data is flowing, it should not impact the result you see. >> >> >> >> >> Are there any settings on the broker side that would affect the join? >> >> >> >> No. The join is computed client side. Broker configs should not have >> any >> >> impact. >> >> >> >>> f I increase the log level for the streams API would that >> shed some light on what is happening? >> >> >> >> I don't think it would help much. The code in question is >> >> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it >> >> does not do any logging except WARN for the already mentioned "dropping >> >> malformed" records that is also recorded via JMX. >> >> >> >>> WARN: "Skipping record due to null key or value. " >> >> >> >> >> >> If you can identify a specific record from the input which would >> produce >> >> an output, but does not, maybe you can try to feed it into your local >> >> test env and try to re-produce the issue? >> >> >> >> >> >> -Matthias >> >> >> >> On 4/30/24 11:38 AM, Chad Preisler wrote: >> >>> Matthias, >> >>> >> >>> Thanks for the information. I ran the code using Kafka locally. After >> >>> submitting some records inside and outside of the time window and >> grace, >> >>> the join performed as expected when running locally. >> >>> >> >>> I'm not sure why the join is not working as expected when running >> against >> >>> our actual brokers. We are peeking at the records for the streams and >> we >>
Re: outerjoin not joining after window
Hello, I think the issue is related to certain partitions not getting records to advance stream time (because of low volume). Can someone confirm that each partition has its own stream time and that the stream time for a partition only advances when a record is written to the topic after the window closes? If I use the repartition method on each input topic to reduce the number of partitions for those streams, how many instances of the application will process records? For example, if the input topics each have 6 partitions, and I use the repartition method to set the number of partitions for the streams to 2, how many instances of the application will process records? Thanks, Chad On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax wrote: > >>> How do you know this? > >> First thing we do is write a log message in the value joiner. We don't > see > >> the log message for the missed records. > > Well, for left/right join results, the ValueJoiner would only be called > when the window is closed... And for invalid input (or late record, ie, > which arrive out-of-order and their window was already closes), records > would be dropped right away. So you cannot really infer that a record > did make it into the join or not, or what happens if it did make it into > the `Processor`. > > -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring > > `dropped-records-total` is the name of the metric. > > > > -Matthias > > > > On 5/1/24 11:35 AM, Chad Preisler wrote: > > Hello, > > > > We did some testing in our test environment today. We are seeing some > > records processes where only one side of the join has a record. So that's > > good. However, we are still seeing some records get skipped. They never > hit > > the value joiner (we write a log message first thing in the value > joiner). > > During the test we were putting some load on the system, so stream time > was > > advancing. We did notice that the join windows were taking much longer > than > > 30 minutes to close and process records. Thirty minutes is the window > plus > > grace. > > > >> How do you know this? > > First thing we do is write a log message in the value joiner. We don't > see > > the log message for the missed records. > > > > I will try pushing the same records locally. However, we don't see any > > errors in our logs and the stream does process one sided joins after the > > skipped record. Do you have any docs on the "dropper records" metric? I > did > > a Google search and didn't find many good results for that. > > > > Thanks, > > > > Chad > > > > On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax > wrote: > > > Thanks for the information. I ran the code using Kafka locally. After > submitting some records inside and outside of the time window and > grace, > the join performed as expected when running locally. > >> > >> That gives some hope :) > >> > >> > >> > >>> However, they never get into the join. > >> > >> How do you know this? > >> > >> > >> Did you check the metric for dropper records? Maybe records are > >> considers malformed and dropped? Are you using the same records in > >> production and in your local test? > >> > >> > Are there any settings for the stream client that would affect the > join? > >> > >> Not that I can think of... There is one more internal config, but as > >> long as data is flowing, it should not impact the result you see. > >> > >> > Are there any settings on the broker side that would affect the join? > >> > >> No. The join is computed client side. Broker configs should not have any > >> impact. > >> > >>> f I increase the log level for the streams API would that > shed some light on what is happening? > >> > >> I don't think it would help much. The code in question is > >> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it > >> does not do any logging except WARN for the already mentioned "dropping > >> malformed" records that is also recorded via JMX. > >> > >>> WARN: "Skipping record due to null key or value. " > >> > >> > >> If you can identify a specific record from the input which would produce > >> an output, but does not, maybe you can try to feed it into your local > >> test env and try to re-produce the issue? > >> > >> > >> -Matthias > >> > >> On 4/30/24 11:38 AM, Chad Preisler wrote: > >>> Matthias, > >>> > >>> Thanks for the information. I ran the code using Kafka locally. After > >>> submitting some records inside and outside of the time window and > grace, > >>> the join performed as expected when running locally. > >>> > >>> I'm not sure why the join is not working as expected when running > against > >>> our actual brokers. We are peeking at the records for the streams and > we > >>> are seeing the records get pulled. However, they never get into the > join. > >>> It's been over 24 hours since the expected records were created, and > >> there > >>> has been plenty of traffic to advance the stream time. Only records > that > >
Re: outerjoin not joining after window
How do you know this? First thing we do is write a log message in the value joiner. We don't see the log message for the missed records. Well, for left/right join results, the ValueJoiner would only be called when the window is closed... And for invalid input (or late record, ie, which arrive out-of-order and their window was already closes), records would be dropped right away. So you cannot really infer that a record did make it into the join or not, or what happens if it did make it into the `Processor`. -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring `dropped-records-total` is the name of the metric. -Matthias On 5/1/24 11:35 AM, Chad Preisler wrote: Hello, We did some testing in our test environment today. We are seeing some records processes where only one side of the join has a record. So that's good. However, we are still seeing some records get skipped. They never hit the value joiner (we write a log message first thing in the value joiner). During the test we were putting some load on the system, so stream time was advancing. We did notice that the join windows were taking much longer than 30 minutes to close and process records. Thirty minutes is the window plus grace. How do you know this? First thing we do is write a log message in the value joiner. We don't see the log message for the missed records. I will try pushing the same records locally. However, we don't see any errors in our logs and the stream does process one sided joins after the skipped record. Do you have any docs on the "dropper records" metric? I did a Google search and didn't find many good results for that. Thanks, Chad On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax wrote: Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. That gives some hope :) However, they never get into the join. How do you know this? Did you check the metric for dropper records? Maybe records are considers malformed and dropped? Are you using the same records in production and in your local test? Are there any settings for the stream client that would affect the join? Not that I can think of... There is one more internal config, but as long as data is flowing, it should not impact the result you see. Are there any settings on the broker side that would affect the join? No. The join is computed client side. Broker configs should not have any impact. f I increase the log level for the streams API would that shed some light on what is happening? I don't think it would help much. The code in question is org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it does not do any logging except WARN for the already mentioned "dropping malformed" records that is also recorded via JMX. WARN: "Skipping record due to null key or value. " If you can identify a specific record from the input which would produce an output, but does not, maybe you can try to feed it into your local test env and try to re-produce the issue? -Matthias On 4/30/24 11:38 AM, Chad Preisler wrote: Matthias, Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. I'm not sure why the join is not working as expected when running against our actual brokers. We are peeking at the records for the streams and we are seeing the records get pulled. However, they never get into the join. It's been over 24 hours since the expected records were created, and there has been plenty of traffic to advance the stream time. Only records that have both a left and right side match are getting processed by the join. Are there any settings for the stream client that would affect the join? Are there any settings on the broker side that would affect the join? The outer join is just one part of the topology. Compared to running it locally there is a lot more data going through the app when running on our actual servers. If I increase the log level for the streams API would that shed some light on what is happening? Does anyone know if there are specific packages that I should increase the log level for? Any specific log message I can hone in on to tell me what is going on? Basically, I'm looking for some pointers on where I can start looking. Thanks, Chad On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax wrote: I expect the join to execute after the 25 with one side of the join containing a record and the other being null Given that you also have a grace period of 5 minutes, the result will only be emitted after the grace-period passed and the window is closed (not when window end time is reached). One has a naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm not sure how to decode th
Re: outerjoin not joining after window
Hello, We did some testing in our test environment today. We are seeing some records processes where only one side of the join has a record. So that's good. However, we are still seeing some records get skipped. They never hit the value joiner (we write a log message first thing in the value joiner). During the test we were putting some load on the system, so stream time was advancing. We did notice that the join windows were taking much longer than 30 minutes to close and process records. Thirty minutes is the window plus grace. > How do you know this? First thing we do is write a log message in the value joiner. We don't see the log message for the missed records. I will try pushing the same records locally. However, we don't see any errors in our logs and the stream does process one sided joins after the skipped record. Do you have any docs on the "dropper records" metric? I did a Google search and didn't find many good results for that. Thanks, Chad On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax wrote: > >> Thanks for the information. I ran the code using Kafka locally. After > >> submitting some records inside and outside of the time window and grace, > >> the join performed as expected when running locally. > > That gives some hope :) > > > > > However, they never get into the join. > > How do you know this? > > > Did you check the metric for dropper records? Maybe records are > considers malformed and dropped? Are you using the same records in > production and in your local test? > > > >> Are there any settings for the stream client that would affect the join? > > Not that I can think of... There is one more internal config, but as > long as data is flowing, it should not impact the result you see. > > > >> Are there any settings on the broker side that would affect the join? > > No. The join is computed client side. Broker configs should not have any > impact. > > > f I increase the log level for the streams API would that > >> shed some light on what is happening? > > I don't think it would help much. The code in question is > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it > does not do any logging except WARN for the already mentioned "dropping > malformed" records that is also recorded via JMX. > > > WARN: "Skipping record due to null key or value. " > > > If you can identify a specific record from the input which would produce > an output, but does not, maybe you can try to feed it into your local > test env and try to re-produce the issue? > > > -Matthias > > On 4/30/24 11:38 AM, Chad Preisler wrote: > > Matthias, > > > > Thanks for the information. I ran the code using Kafka locally. After > > submitting some records inside and outside of the time window and grace, > > the join performed as expected when running locally. > > > > I'm not sure why the join is not working as expected when running against > > our actual brokers. We are peeking at the records for the streams and we > > are seeing the records get pulled. However, they never get into the join. > > It's been over 24 hours since the expected records were created, and > there > > has been plenty of traffic to advance the stream time. Only records that > > have both a left and right side match are getting processed by the join. > > > > Are there any settings for the stream client that would affect the join? > > > > Are there any settings on the broker side that would affect the join? > > > > The outer join is just one part of the topology. Compared to running it > > locally there is a lot more data going through the app when running on > our > > actual servers. If I increase the log level for the streams API would > that > > shed some light on what is happening? Does anyone know if there are > > specific packages that I should increase the log level for? Any specific > > log message I can hone in on to tell me what is going on? > > > > Basically, I'm looking for some pointers on where I can start looking. > > > > Thanks, > > Chad > > > > > > On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax > wrote: > > > >>> I expect the join to > execute after the 25 with one side of the join containing a record and > >> the > other being null > >> > >> Given that you also have a grace period of 5 minutes, the result will > >> only be emitted after the grace-period passed and the window is closed > >> (not when window end time is reached). > >> > >>> One has a > naming convention of "KSTREAM_OUTERSHARED". I see a record there, but > >> I'm > not sure how to decode that message to see what is in it. What is the > purpose of those messages? > >> > >> It's an internal store, that stores all records which are subject to be > >> emitted as left/right join result, ie, if there is no inner join result. > >> The format used is internal: > >> > >> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java > >> > >> Also note: t
Re: outerjoin not joining after window
Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. That gives some hope :) However, they never get into the join. How do you know this? Did you check the metric for dropper records? Maybe records are considers malformed and dropped? Are you using the same records in production and in your local test? Are there any settings for the stream client that would affect the join? Not that I can think of... There is one more internal config, but as long as data is flowing, it should not impact the result you see. Are there any settings on the broker side that would affect the join? No. The join is computed client side. Broker configs should not have any impact. f I increase the log level for the streams API would that shed some light on what is happening? I don't think it would help much. The code in question is org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it does not do any logging except WARN for the already mentioned "dropping malformed" records that is also recorded via JMX. WARN: "Skipping record due to null key or value. " If you can identify a specific record from the input which would produce an output, but does not, maybe you can try to feed it into your local test env and try to re-produce the issue? -Matthias On 4/30/24 11:38 AM, Chad Preisler wrote: Matthias, Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. I'm not sure why the join is not working as expected when running against our actual brokers. We are peeking at the records for the streams and we are seeing the records get pulled. However, they never get into the join. It's been over 24 hours since the expected records were created, and there has been plenty of traffic to advance the stream time. Only records that have both a left and right side match are getting processed by the join. Are there any settings for the stream client that would affect the join? Are there any settings on the broker side that would affect the join? The outer join is just one part of the topology. Compared to running it locally there is a lot more data going through the app when running on our actual servers. If I increase the log level for the streams API would that shed some light on what is happening? Does anyone know if there are specific packages that I should increase the log level for? Any specific log message I can hone in on to tell me what is going on? Basically, I'm looking for some pointers on where I can start looking. Thanks, Chad On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax wrote: I expect the join to execute after the 25 with one side of the join containing a record and the other being null Given that you also have a grace period of 5 minutes, the result will only be emitted after the grace-period passed and the window is closed (not when window end time is reached). One has a naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm not sure how to decode that message to see what is in it. What is the purpose of those messages? It's an internal store, that stores all records which are subject to be emitted as left/right join result, ie, if there is no inner join result. The format used is internal: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java Also note: time is based on event-time, ie, if the input stream stops to send new records, "stream-time" will stop to advance and the result might not be emitted because the window does not get closed. (Last, there is some internal wall-clock time delay of one second to emit results for performance reasons...) HTH. -Matthias On 4/30/24 6:51 AM, Chad Preisler wrote: Hello, I have a KStream to KStream outer join with a time difference of 25 minutes and 5 minutes of grace. When I get a record for one side of the join, but don't get a record on the other side of the join, I expect the join to execute after the 25 with one side of the join containing a record and the other being null. Is that correct? If it is correct, it's not working for me. I was poking around on the broker and saw some internal topics. I see the key I expected to execute the join on some of those topics. One has a naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm not sure how to decode that message to see what is in it. What is the purpose of those messages? If I decode the message will it help me see when the join should have been executed? I also see the key on a topic with the naming convention "KSTREAM_OUTERTHIS". Are there any other topics that I should be looking at to troubleshoot this issue? Thanks, Chad
Re: outerjoin not joining after window
Matthias, Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. I'm not sure why the join is not working as expected when running against our actual brokers. We are peeking at the records for the streams and we are seeing the records get pulled. However, they never get into the join. It's been over 24 hours since the expected records were created, and there has been plenty of traffic to advance the stream time. Only records that have both a left and right side match are getting processed by the join. Are there any settings for the stream client that would affect the join? Are there any settings on the broker side that would affect the join? The outer join is just one part of the topology. Compared to running it locally there is a lot more data going through the app when running on our actual servers. If I increase the log level for the streams API would that shed some light on what is happening? Does anyone know if there are specific packages that I should increase the log level for? Any specific log message I can hone in on to tell me what is going on? Basically, I'm looking for some pointers on where I can start looking. Thanks, Chad On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax wrote: > > I expect the join to > >> execute after the 25 with one side of the join containing a record and > the > >> other being null > > Given that you also have a grace period of 5 minutes, the result will > only be emitted after the grace-period passed and the window is closed > (not when window end time is reached). > > > One has a > >> naming convention of "KSTREAM_OUTERSHARED". I see a record there, but > I'm > >> not sure how to decode that message to see what is in it. What is the > >> purpose of those messages? > > It's an internal store, that stores all records which are subject to be > emitted as left/right join result, ie, if there is no inner join result. > The format used is internal: > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java > > Also note: time is based on event-time, ie, if the input stream stops to > send new records, "stream-time" will stop to advance and the result > might not be emitted because the window does not get closed. > > (Last, there is some internal wall-clock time delay of one second to > emit results for performance reasons...) > > HTH. > > -Matthias > > On 4/30/24 6:51 AM, Chad Preisler wrote: > > Hello, > > > > I have a KStream to KStream outer join with a time difference of 25 > minutes > > and 5 minutes of grace. When I get a record for one side of the join, > but > > don't get a record on the other side of the join, I expect the join to > > execute after the 25 with one side of the join containing a record and > the > > other being null. Is that correct? If it is correct, it's not working > for > > me. > > > > I was poking around on the broker and saw some internal topics. I see the > > key I expected to execute the join on some of those topics. One has a > > naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm > > not sure how to decode that message to see what is in it. What is the > > purpose of those messages? If I decode the message will it help me see > when > > the join should have been executed? > > > > I also see the key on a topic with the naming convention > > "KSTREAM_OUTERTHIS". > > > > Are there any other topics that I should be looking at to troubleshoot > this > > issue? > > > > Thanks, > > Chad > > >
Re: outerjoin not joining after window
I expect the join to execute after the 25 with one side of the join containing a record and the other being null Given that you also have a grace period of 5 minutes, the result will only be emitted after the grace-period passed and the window is closed (not when window end time is reached). One has a naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm not sure how to decode that message to see what is in it. What is the purpose of those messages? It's an internal store, that stores all records which are subject to be emitted as left/right join result, ie, if there is no inner join result. The format used is internal: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java Also note: time is based on event-time, ie, if the input stream stops to send new records, "stream-time" will stop to advance and the result might not be emitted because the window does not get closed. (Last, there is some internal wall-clock time delay of one second to emit results for performance reasons...) HTH. -Matthias On 4/30/24 6:51 AM, Chad Preisler wrote: Hello, I have a KStream to KStream outer join with a time difference of 25 minutes and 5 minutes of grace. When I get a record for one side of the join, but don't get a record on the other side of the join, I expect the join to execute after the 25 with one side of the join containing a record and the other being null. Is that correct? If it is correct, it's not working for me. I was poking around on the broker and saw some internal topics. I see the key I expected to execute the join on some of those topics. One has a naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm not sure how to decode that message to see what is in it. What is the purpose of those messages? If I decode the message will it help me see when the join should have been executed? I also see the key on a topic with the naming convention "KSTREAM_OUTERTHIS". Are there any other topics that I should be looking at to troubleshoot this issue? Thanks, Chad
outerjoin not joining after window
Hello, I have a KStream to KStream outer join with a time difference of 25 minutes and 5 minutes of grace. When I get a record for one side of the join, but don't get a record on the other side of the join, I expect the join to execute after the 25 with one side of the join containing a record and the other being null. Is that correct? If it is correct, it's not working for me. I was poking around on the broker and saw some internal topics. I see the key I expected to execute the join on some of those topics. One has a naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm not sure how to decode that message to see what is in it. What is the purpose of those messages? If I decode the message will it help me see when the join should have been executed? I also see the key on a topic with the naming convention "KSTREAM_OUTERTHIS". Are there any other topics that I should be looking at to troubleshoot this issue? Thanks, Chad