Hello all,

We use Apache Spark 3.2.0 and our data stored on Apache Hadoop with parquet 
format.

One of the advantages of the parquet format is the presence of the predicate 
pushdown filter feature, which allows only the necessary data to be read. This 
feature is well provided by Spark. For example, in the following query, Spark 
detects the predicate condition and causes the least amount of data to be read 
using parquet metadata:

Select * from files where fieldA in (1, 2);
Predicate pushdown: fieldA in (1, 2) # filter parquet column pages with the 
help of metadata of column

This feature also exists when we use the ` Column.isInCollection()` function, 
and again Spark deduces the predicate pushdown filters correctly. However, the 
larger the list value in the ` isInCollection` function, the larger the query 
text that is created, making it impossible to serialize it and send it to the 
workers to execute the command in practice. There is an open issue to improve 
it too: https://issues.apache.org/jira/browse/SPARK-29048

Unfortunately, we have problems with the list of more than 1 million members, 
and it works very slowly for values close to it.  However, the list size is 
such that it can fit perfectly in the memory of all workers (even if it has 
nearly 1 billion members)

However, it might be said that the appropriate method here is to broadcast this 
list as a variable and use it later. By trying this method, we still saw that 
the list members are in the query text and it is impossible to send the query 
to other workers due to its large size. (Maybe we made a mistake in using this 
feature and I would be grateful if someone who specializes in this field would 
guide us)

Another suggestion might be to save this list as a data source and use the join 
operation when querying. But the problem with this method is that we lose the 
`predicate pushdown’ advantage because Spark reads the whole data and then does 
the joining operation. (We need the information in this list to be used when 
reading parquets, not to be applied after reading. The DPP feature also did not 
help us because that feature is only for partitioned data and has nothing to do 
with parquet metadata.)

As a result, we are looking to have a large list that can be placed in workers 
memory (It is possible to guarantee it based on our resources), and then when 
reading the parquets, we filter their row-group and columns page through this 
list in worker memory (by using parquet metadata). Is there a way to do this in 
Spark? I would be very grateful if you could help me with this.

Reply via email to