Hi Alex! Thank you for the list! The build of the modified cdh5-trunk branch (debug mode) was sucessfull. After replacing "impala-frontend-0.1-SNAPSHOT.jar" in /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/ I got the following error in my existing cluster: F0416 01:16:45.402997 17897 catalog.cc:69] NoSuchMethodError: getCatalogObjects When I switch back to the original jar file the error is gone. So it must be something wrong with this file I guess. But I wonder about the error in catalog.cc because I didn't touch any .cc files.

I also replaced "impala-data-source-api-1.0-SNAPSHOT.jar". The other jar files do not exist in my impala installation (CDH-5.13.1).

What am I doing wrong?

Best regards

Am 13.04.2018 um 20:12 schrieb Alexander Behm:
Here's the foll list. It might not be minimal, but copying/overwriting these should work.


If you are only modifying the Java portion (like DistributedPlanner), then only copying/replacing the *.jar files should be sufficient.

On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause <philippkrause.m...@googlemail.com <mailto:philippkrause.m...@googlemail.com>> wrote:

    Yes, I have a running (virtual) cluster. I would try to follow
    your way with the custom impala build (DistributedPlanner.java is
    the only modified file at the moment). Thank you in advance for
    the file list!

    Best regards

    Alexander Behm <alex.b...@cloudera.com
    <mailto:alex.b...@cloudera.com>> schrieb am Fr., 13. Apr. 2018, 18:45:

        I'm not really following your installation/setup and am not an
        expert on Cloudera Manager installation/config. If you are
        going to build Impala anyway, it's probably easiest to test on
        Impala's minicluster first.

        In general, if you have a running Cloudera Managed cluster,
        you can deploy a custom Impala build by simply overwriting the
        Impala existing binaries and jars with the new build. If you
        want to go this route, I can give you a full list of files you
        need to replace.

        On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause
        <mailto:philippkrause.m...@googlemail.com>> wrote:

            Thank you for the explanation! Yes, I'm using HDFS. The
            single replica setup is only for test purposes at the
            moment. I think this makes it easier to gain some first
            results since less modifications (scheduler etc.) are
            I would like to test the DistributedPlanner modification
            in my virtual cluster. I used a customized Vagrant script
            to install Impala on multiple hosts (s.attachment). It
            simply installs cloudera-manager-server-db,
            cloudera-manager-server and cloudera-manager-daemons via
            apt-get. What would be the simplest solution to setup my
            modified version? Could I simply call ./buildall.sh and
            change the script to sth. like this?

            echo "Install java..."
            apt-get -q -y --force-yes install oracle-j2sdk1.7
            echo "Download impala..."
            wget https://... where I uploaded my modified version
            echo "Extract impala..."
            tar -xvzf Impala-cdh5-trunk.tar.gz
            cd Impala-cdh5-trunk
            echo "Build impala..."
            echo "Start impala instances..."
            service cloudera-scm-server-db initdb
            service cloudera-scm-server-db start
            service cloudera-scm-server start

            Or is there another, maybe even easier method, to test the
            code? Maybe via bootstrap_development.sh / minicluster?

            Best regards

            2018-04-05 18:39 GMT+02:00 Alexander Behm
            <alex.b...@cloudera.com <mailto:alex.b...@cloudera.com>>:

                Apologies for the late response. Btw, your previous
                post was clear enough to me, so no worries :)

                On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause
                <mailto:philippkrause.m...@googlemail.com>> wrote:

                    Hello Alex,

                    I think my previous post has been too long and
                    confusing. I apologize for that!

                    If replicas are completely deactivated, all scan
                    ranges of a block are mapped to the one host,
                    where the block is located on. This host is the
                    "executor"/reader for all the scan ranges of this
                    block. Is that correct?

                Yes, assuming you are using HDFS.

                    I tried to visualize my understanding of the
                    scan_range to host mapping for my use case (s.
                    attachment). Could you please have a quick look at
                    it and tell me if this is correct?

                    "The existing scan range assignment is scan-node
                    centric. For each scan node, we independently
                    decide which of its scan ranges should be
                    processed by which host."
                    Without replicas, all scan ranges of a block would
                    be assigend to the same host where this block is
                    located on. Isn't everything local here, so that
                    Table_A - Block_0 and Table_B - Block_0 can be
                    joined local or are further steps neccessary? The
                    condition in the DistributedPlanner you pointed to
                    me is set to false (no exchange nodes).

                    "You want it to be host-centric. For each host,
                    collect the local scan ranges of *all* scan nodes,
                    and assign them to that host."
                    Wouldn't the standard setup from above work?
                    Wouldn't I assign all (the same) scan ranges to
                    each host in this case here?

                The standard setup works only in if every block only
                has exactly one replica. For our purposes, that is
                basically never the case (who would store production
                data without replication?), so the single-replica
                assumption was not clear to me.

                Does your current setup (only changing the planner and
                not the scheduler) produce the expected results?

                    Thank you very much!

                    Best regards

                    2018-03-28 21:04 GMT+02:00 Philipp Krause

                        Thank you for your answer and sorry for my delay!

                        If my understanding is correct, the list of
                        scan nodes consists of all nodes which contain
                        a *local* block from a table that is needed
                        for the query (Assumption: I have no replicas
                        in my first tests). If TableA-Block0 is on
                        Node_0, isn't Node_0 automatically a scan
                        node? And wouldn't this scan node always be
                        the host for the complete scan range(s) then?

                        "For each scan node, we independently decide
                        which of its scan ranges should be processed
                        by which host."


                        // Loop over all scan ranges, select an
                        executor for those with local impalads and
                        // collect all others for later processing.

                        So in this whole block, scan ranges are
                        assigned to the closest executor (=host?). But
                        isn't the closest executor always the node the
                        block is located on (assumed impalad is
                        installed and I have no replicas)? And isn't
                        this node always a scan node at the same time?
                        Otherwise a thread on a remote host had to
                        read the corresponding scan range, which would
                        be more expensive. The only exception I can
                        think of is when all threads on the local node
                        are busy. Or, if I use replicas and all other
                        threads of my node with the "original" block
                        are busy, a thread on another node which
                        contains a replica could read a special scan
                        range of its local block. Is my understanding
                        correct here?

                        Aren't all scan ranges read locally by its
                        scan nodes if I have impalad installed on all
                        nodes? And am I right, that the scan range is
                        only based on its length which refers to
                        maxScanRangeLength in computeScanRangeLocations?

                        I hope you can help me with the scan node <->
                        scan range->host relationship. If I have
                        Table_A-Block_0 and Table_B_Block_0 on the
                        same node (which I want to join locally), I
                        don't get the point of why scan ranges could
                        be assigned to another host in my scenario.

                        Best regads and thank you very much!
                        Philipp Krause

                        Am 21.03.2018 um 05:21 schrieb Alexander Behm:
                        Thanks for following up. I think I understand
                        your setup.

                        If you want to not think about scan ranges,
                        then you can modify
                        HdfsScanNode.computeScanRangeLocations(). For
                        example, you could change it to produce one
                        scan range per file or per HDFS block. That
                        way you'd know exactly what a scan range
                        corresponds to.

                        I think the easiest/fastest way for you to
                        make progress is to re-implement the existing
                        scan range assignment logic in that place in
                        the code I had pointed you to. There is no
                        quick fix to change the existing behavior.
                        The existing scan range assignment is
                        scan-node centric. For each scan node, we
                        independently decide which of its scan ranges
                        should be processed by which host.

                        I believe an algorithm to achieve your goal
                        would look completely different. You want it
                        to be host-centric. For each host, collect
                        the local scan ranges of *all* scan nodes,
                        and assign them to that host.

                        Does that make sense?


                        On Mon, Mar 19, 2018 at 1:02 PM, Philipp
                        Krause <philippkrause.m...@googlemail.com

                            I'd like to provide a small example for
                            our purpose. The last post may be a bit
                            confusing, so here's a very simple
                            example in the attached pdf file. I hope,
                            it's understandable. Otherwise, please
                            give me a short feedback.

                            Basically, I only want each data node to
                            join all it's local blocks. Is there a
                            range mapping needed or is it possible to
                            easily join all local blocks (regardless
                            of its content) since everything is
                            already "prepared"? Maybe you can clarify
                            this for me.

                            As you can see in the example, the tables
                            are not partitioned by ID. The files are
                            manually prepared by the help of the
                            modulo function. So I don't have a range
                            like [0,10], but something like 0,5,10,15

                            I hope, I didn't make it too complicated
                            and confusing. I think, the actual idea
                            behind this is really simple and I hope
                            you can help me to get this working.

                            Best regards and thank you very much for
                            your time!

                            Am 18.03.2018 um 17:32 schrieb Philipp

                            Hi! At the moment the data to parquet
                            (block) mapping is based on a simple
                            modulo function: Id % #data_nodes. So
                            with 5 data nodes all rows with Id's
                            0,5,10,... are written to Parquet_0,
                            Id's 1,4,9 are written to Parquet_1 etc.
                            That's what I did manually. Since the
                            parquet file size and the block size are
                            both set to 64MB, each parquet file will
                            result in one block when I transfer the
                            parquet files to HDFS. By default, HDFS
                            distributes the blocks randomly. For
                            test purposes I transferred
                            corresponding blocks from Table_A and
                            Table_B to the same data node (Table_A -
                            Block_X with Id's 0,5,10 and Table_B -
                            Block_Y with Id's 0,5,10). In this case,
                            they are transferred to data_node_0
                            because the modulo function (which I
                            want to implement in the scheduler)
                            returns 0 for these Id's. This is also
                            done manually at the moment.

                            1.) DistributedPlanner: For first,
                            upcoming tests I simply changed the
                            first condition in the
                            DistributedPlanner to true to avoid
                            exchange nodes.

                            2.) The scheduler: That's the part I'm
                            currently struggling with. For first
                            tests, block replication is deactivated.
                            I'm not sure how / where to implement
                            the modulo function for scan range to
                            host mapping. Without the modulo
                            function, I had to implement a hard
                            coded mapping (something like "range"
                            0-0, 5-5, 10-10 -> Data_node_0 etc.). Is
                            that correct? Instead I would like to
                            use a slightly more flexible solution by
                            the help of this modulo function for the
                            host mapping.

                            I would be really grateful if you could
                            give me a hint for the scheduling
                            implementation. I try to go deeper
                            through the code meanwhile.

                            Best regards and thank you in advance

                            Am 14.03.2018 um 08:06 schrieb Philipp
                            Thank you very much for these
                            information! I'll try to implement
                            these two steps and post some updates
                            within the next days!

                            Best regards

                            2018-03-13 5:38 GMT+01:00 Alexander
                            Behm <alex.b...@cloudera.com

                                Cool that you working on a research
                                project with Impala!

                                Properly adding such a feature to
                                Impala is a substantial effort, but
                                hacking the code for an experiment
                                or two seems doable.

                                I think you will need to modify two
                                things: (1) the planner to not add
                                exchange nodes, and (2) the
                                scheduler to assign the co-located
                                scan ranges to the same host.

                                Here are a few starting points in
                                the code:

                                1) DistributedPlanner

                                The first condition handles the
                                case where no exchange nodes need
                                to be added because the join inputs
                                are already suitably partitioned.
                                You could hack the code to always
                                go into that codepath, so no
                                exchanges are added.

                                2) The scheduler

                                You'll need to dig through and
                                understand that code so that you
                                can make the necessary changes.
                                Change the scan range to host
                                mapping to your liking. The rest of
                                the code should just work.



                                On Mon, Mar 12, 2018 at 6:55 PM,
                                Philipp Krause

                                    Thank you very much for your
                                    quick answers!
                                    The intention behind this is to
                                    improve the execution time and
                                    (primarily) to examine the
                                    impact of block-co-location
                                    (research project) for this
                                    particular query (simplified):

                                    select A.x, B.y, A.z from
                                    tableA as A inner join tableB
                                    as B on A.id=B.id

                                    The "real" query includes three
                                    joins and the data size is in
                                    pb-range. Therefore several
                                    nodes (5 in the test
                                    environment with less data) are
                                    used (without any load balancer).

                                    Could you give me some hints
                                    what code changes are required
                                    and which files are affected? I
                                    don't know how to give Impala
                                    the information that it should
                                    only join the local data blocks
                                    on each node and then pass it
                                    to the "final" node which
                                    receives all intermediate
                                    results. I hope you can help me
                                    to get this working. That would
                                    be awesome!

                                    Best regards

                                    Am 12.03.2018 um 18:38 schrieb
                                    Alexander Behm:
                                    I suppose one exception is if
                                    your data lives only on a
                                    single node. Then you can set
                                    num_nodes=1 and make sure to
                                    send the query request to the
                                    impalad running on the same
                                    data node as the target data.
                                    Then you should get a local join.

                                    On Mon, Mar 12, 2018 at 9:30
                                    AM, Alexander Behm

                                        Such a specific block
                                        arrangement is very
                                        uncommon for typical
                                        Impala setups, so we don't
                                        attempt to recognize and
                                        optimize this narrow case.
                                        In particular, such an
                                        arrangement tends to be
                                        short lived if you have
                                        the HDFS balancer turned on.

                                        Without making code
                                        changes, there is no way
                                        today to remove the data
                                        exchanges and make sure
                                        that the scheduler assigns
                                        scan splits to nodes in
                                        the desired way
                                        (co-located, but with
                                        possible load imbalance).

                                        In what way is the current
                                        setup unacceptable to you?
                                        Is this pre-mature
                                        optimization? If you have
                                        certain performance
                                        for specific queries we
                                        might be able to help you
                                        improve those. If you want
                                        to pursue this route,
                                        please help us by posting
                                        complete query profiles.


                                        On Mon, Mar 12, 2018 at
                                        6:29 AM, Philipp Krause

                                            Hello everyone!

                                            In order to prevent
                                            network traffic, I'd
                                            like to perform local
                                            joins on each node
                                            instead of exchanging
                                            the data and perform a
                                            join over the complete
                                            data afterwards. My
                                            query is basically a
                                            join over three three
                                            tables on an ID
                                            attribute. The blocks
                                            are perfectly
                                            distributed, so that
                                            e.g. Table A - Block
                                            0  and Table B - Block
                                            0  are on the same
                                            node. These blocks
                                            contain all data rows
                                            with an ID range
                                            [0,1]. Table A - Block
                                            1  and Table B - Block
                                            1 with an ID range
                                            [2,3] are on another
                                            node etc. So I want to
                                            perform a local join
                                            per node because any
                                            data exchange would be
                                            unneccessary (except
                                            for the last step when
                                            the final node
                                            recevieves all results
                                            of the other nodes).
                                            Is this possible?
                                            At the moment the
                                            query plan includes
                                            multiple data
                                            exchanges, although
                                            the blocks are already
                                            perfectly distributed
                                            I would be grateful
                                            for any help!

                                            Best regards
                                            Philipp Krause

 file created at: 2018/04/16 01:16:43
Running on machine: vm-cluster-node1
Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg
E0416 01:16:43.556149 17897 logging.cc:126] stderr will be logged to this file.
log4j:WARN No appenders could be found for logger 
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
F0416 01:16:45.402997 17897 catalog.cc:69] NoSuchMethodError: getCatalogObjects
. Impalad exiting.
*** Check failure stack trace: ***
    @          0x1b9be5d  google::LogMessage::Fail()
    @          0x1b9d702  google::LogMessage::SendToLog()
    @          0x1b9b837  google::LogMessage::Flush()
    @          0x1b9edfe  google::LogMessageFatal::~LogMessageFatal()
    @           0x8300f2  impala::Catalog::Catalog()
    @           0x814319  impala::CatalogServer::Start()
    @           0x80d044  CatalogdMain()
    @           0x7d95a6  main
    @     0x7f8f74b3776d  __libc_start_main
    @           0x80c46d  (unknown)
Picked up JAVA_TOOL_OPTIONS: -Xms204472320 -Xmx204472320 
Wrote minidump to 

Reply via email to