Heidy, which state backend are you using? With RocksDB Flink will have to
do ser/de on every access and update, but with the FsStateBackend, your
sparse matrix will sit in memory, and only have to be serialized during
checkpointing.

David

On Wed, Sep 9, 2020 at 2:41 PM Heidi Hazem Mohamed <h.ha...@nu.edu.eg>
wrote:

> Hi Walther,
>
> Many thanks for your answer, I declared the state type as below
>
> ValueStateDescriptor<SparseBinaryMatrix> descriptor =
>         new ValueStateDescriptor<SparseBinaryMatrix>(
>                "Rating Matrix",
>                TypeInformation.of(new TypeHint<SparseBinaryMatrix>() {
>                }
>        ));
>
>
> Is there a better way?
>
> Regards,
>
> Heidy
> ------------------------------
> *From:* Timo Walther <twal...@apache.org>
> *Sent:* Wednesday, September 9, 2020 1:58 PM
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Slow Performance inquiry
>
> Hi Hazem,
>
> I guess your performance is mostly driven by the serialization overhead
> in this case. How do you declare your state type?
>
> Flink comes with different serializers. Not all of them are extracted
> automatically when using reflective extraction methods:
>
> -  Note that `Serializable` declaration has no effect for Flink, other
> than NOT using Flink's efficient serializers.
> - Flink's POJO serializer only works with a default constructor present.
> - Row needs to explicit declaration of fields.
>
> Regards,
> Timo
>
>
> On 09.09.20 13:08, Heidi Hazem Mohamed wrote:
> > Dear,
> >
> > I am writing a Flink program(Recommender system) needed a matrix as a
> > state which is the rating matrix, While the matrix is very sparse, I
> > implemented a sparse binary matrix to save the memory and save only the
> > ones, not all the matrix and use it as a data type and save it in a
> > value State but unexpectedly the performance became terrible and the job
> > became very slow, I wonder any suggestion to know what is the problem?
> >
> > My first implementation for the rating matrix state :
> >
> > MapState<String, Map<String, Float>>ratingMatrix;
> >
> >
> > The second implementation (the slow one) for rating matrix state:
> >
> > ValueState<SparseBinaryMatrix>userItemRatingHistory;
> >
> >
> > and this apart from sparseBinaryMatrix class
> >
> > public class SparseBinaryMatriximplements Serializable {
> >
> >      private ArrayList<Row>content;
> >
> > private int rowLength;
> >
> > private HashMap<String, Integer>columnLabels;
> > private HashMap<Integer, String>inverseColumnLabels;
> >
> > private HashMap<String, Integer>rowLabels;
> > private HashMap<Integer, String>inverseRowLabels;
> >
> > private enum LabelerType{Row, Column};
> >
> > public IntegercolNumber;
> > public IntegerrowNumber;
> >
> >
> > // This constructor initializes the matrix with zeros
> > public SparseBinaryMatrix(int rows, int columns)
> >      {
> >          content =new ArrayList<>(rows);
> > rowLength = columns;
> > // for (int i = 0; i < rows; i++)
> > // content.add(new Row(columns));
> >
> >
> > }
> >
> >
> >
> > Is depending on other class (Row) may lead to this terrible performance
> > while Row is class I have implemented and this is part of it
> >
> > public class Rowimplements Serializable {
> >      //This is an alternating sorted array
> > private ArrayList<Integer>content;
> > private int length=0;
> >
> > public Row (int numbColumns)
> >      {
> >          length = numbColumns;
> > for (int i =0; i < numbColumns;i++)
> >              setColumnToZero(i);
> > }
> >
> >      public Row (int[] initialValues )
> >      {
> >          length = initialValues.length;
> > content =new ArrayList<>(length);
> > for (int i =0; i <length;i++)
> >              setColumn(i, initialValues[i]);
> > }
> >
> >
> > Regards,
> >
> > Heidy
> >
>
>

Reply via email to