Hi,

我在[email protected]列表中也发送了这个问题,已经得到回复说问题的原因如下:

The root cause is the wrong mapping of the state key to the state. This
> kind of wrong mapping occurs when the key is switched, but the state is not
> used. As you wrote in the example, the `data` you declared is not used in
> `process_element2`
>

目前已经开了issue来追踪这个问题,并且会在下个release-1.13.2中包含修复。

赵飞 <[email protected]> 于2021年7月12日周一 下午4:53写道:

> Hi,
>
> 以下是完整代码。需要补充的是,相同条件下运行相同代码,结果可能会不一样,有时候正常,有时候能够复现问题。
>
> import random
>
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import KeyedCoProcessFunction
> from pyflink.datastream.state import MapStateDescriptor
> from pyflink.datastream import RuntimeContext
>
>
> def test(data):
>     product_ids = set()
>     for key, value in data.items():
>         product_ids.add(value[0])
>     return list(product_ids)
>
>
> class MyFunction(KeyedCoProcessFunction):
>     def open(self, ctx):
>         data_desc = MapStateDescriptor('data', Types.STRING(), 
> Types.ROW([Types.INT()]))
>         self.data = ctx.get_map_state(data_desc)
>
>         rule_desc = MapStateDescriptor('rule', Types.STRING(), 
> Types.ROW([Types.INT()]))
>         self.rules = ctx.get_map_state(rule_desc)
>
>     def process_element1(self, data_value, ctx):
>         row_id, others = data_value[0], data_value[1:]
>         self.data.put(row_id, others)
>         result = []
>         for key, value_list in self.rules.items():
>             product_id, random_0, random_1  = value_list
>             # Do some calculations
>             product_ids_of_state_data = test(self.data)
>             result.append([random_0, random_1, product_id, 
> product_ids_of_state_data])
>         return result
>
>     def process_element2(self, rule_value, ctx):
>         row_id, others = rule_value[0], rule_value[1:]
>         self.rules.put(row_id, others)
>
> def generate_data1(count):
>     collection = []
>     for i in range(count):
>         collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2])
>     return collection
>
> def generate_data2(count):
>     collection = []
>     for i in range(count):
>         collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % i, i 
> * 2])
>     return collection
>
>
> def main():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>
>     data = env.from_collection(generate_data1(50))
>     rules = env.from_collection([
>         ['row_0', 1, 'rule1_value0', 'rule1_value1'],
>         ['row_1', 2, 'rule2_value0', 'rule2_value1']
>     ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), 
> Types.STRING()]))
>     results = data.connect(rules).key_by(lambda x: x[1], lambda y: 
> y[1]).process(MyFunction())
>     results.print()
>
>     env.execute("test_job")
>
> if __name__ == "__main__":
>     main()
>
>
> Dian Fu <[email protected]> 于2021年7月12日周一 下午4:48写道:
>
>> Hi,
>>
>> 是否发一下可复现的完整示例?
>>
>> Regards,
>> Dian
>>
>> > 2021年7月10日 下午7:44,赵飞 <[email protected]> 写道:
>> >
>> > 各位好,请教一个问题。
>> >
>> >
>> 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码大致如下:
>> >
>> > -------
>> > results = data.connect(rules).key_by('product_id',
>> > 'product_id').process(MyFunction())
>> > results.print()
>> >
>> > class MyFunction(KeyedCoProcessFunction):
>> >    def open(self, ctx):
>> >        data_desc = MapStateDescriptor('data', key_type, value_type)
>> >        self.data = ctx.get_map_state(data_desc)
>> >
>> >        rule_desc = MapStateDescriptor('rule', key_type,  value_type)
>> >        self.rules = ctx.get_map_state(rule_desc)
>> >
>> >    def process_element1(self, data_value, ctx):
>> >        row_id, others = data_value[0], data_value[1:]
>> >        self.data.put(row_id, others)
>> >        result = []
>> >        for key, value_list in self.rules.items():
>> >            product_id, random_0, random_1  = value_list
>> >            # Do some calculations
>> >            product_ids_of_state_data = OtherFunction(self.data)
>> >            result.append(random_0, random_1, product_id,
>> > product_ids_of_state_data)
>> >        return result
>> >
>> >    def process_element2(self, rule_value, ctx):
>> >        row_id, others = rule_value[0], rule_value[1:]
>> >        self.rules.put(row_id, others)
>> > ------
>> >
>> > 数据格式大致如下:
>> > # 数据流(假设第二个元素为产品id)
>> > [
>> >    ['row_0', 1, 'a_0', 2],
>> >    ['row_1', 2, 'a_1', 3],
>> >    ['row_2', 1, 'a_2', 4],
>> >    ['row_4', 2, 'a_3', 5]
>> > ]
>> >
>> > # 规则流(假设第二个元素为产品id)
>> > [
>> >    ['row_0', 1, 'rule1_value0', 'rule1_value1'],
>> >    ['row_1', 2, 'rule2_value0', 'rule2_value1']
>> > ]
>> >
>> > 执行程序(指定全局并行度为1)后,得到的输出类似于:
>> > ['rule1_value0',  'rule1_value1', 1, [1, 2]]
>> > ['rule2_value0',  'rule2_value1', 2, [1, 2]]
>> >
>> 从输出来看,当某产品的数据进来时,只使用了其对应的规则进行了处理,可以表明规则确实按产品id分区了,但是维护数据的MapState中却包含了多个产品id的数据。
>> >
>> > 更进一步的现象为:
>> > 0. 如果数据流中的数据一直按照先产品1,后产品2的顺序,那么能够正常分区。但是如果无法保证这个顺序,则会出现以上描述的问题
>> > 1. 一旦数据流中的元素数量超过50,那么便会出现以上现象(上述所说的“规则按照产品id正确分区”也有可能只是规则流的数量未超过上限)
>> >
>> >
>> 按照官网的描述,某个键值的数据应只能访问到属于该键值的状态,在这个例子中,我的理解是维护数据的MapState中应该有且仅包含一个产品的数据。请问是我理解有误?还是这确实是个问题?
>>
>>

回复