[jira] [Commented] (PHOENIX-3664) Pyspark: pushing filter by date against apache phoenix
[ https://issues.apache.org/jira/browse/PHOENIX-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869655#comment-15869655 ] Pablo Castilla commented on PHOENIX-3664: - At the end we have switched from python to scala and all seems to work at very good speed. We haven't found the phoenixTableAsRDD in python. We would prefer python as we use it in machine learning implementations, but they are very very similar so moving to scala is not a big deal. > Pyspark: pushing filter by date against apache phoenix > -- > > Key: PHOENIX-3664 > URL: https://issues.apache.org/jira/browse/PHOENIX-3664 > Project: Phoenix > Issue Type: Bug >Affects Versions: 4.7.0 > Environment: Azure HDIndight (HDI 3.5) - pyspark using phoenix > client. (Spark 1.6.3 - HBase 1.1.2 under HDP 2.5) >Reporter: Pablo Castilla > > I am trying to filter by date in apache phoenix from pyspark. The column in > phoenix is created as Date and the filter is a datetime. When I use explain I > see spark doesn't push the filter to phoenix. I have tried a lot of > combinations without luck. > Any way to do it? > df = sqlContext.read \ >.format("org.apache.phoenix.spark") \ > .option("table", "TABLENAME") \ > .option("zkUrl",zookepperServer +":2181:/hbase-unsecure" ) \ > .load() > print(df.printSchema()) > startValidation = datetime.datetime.now() > print(df.filter(df['FH'] >startValidation).explain(True)) > Results: > root > |-- METER_ID: string (nullable = true) > |-- FH: date (nullable = true) > None >== Parsed Logical Plan == > 'Filter (FH#53 > 1486726683446150) > +- > Relation[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure) > == Analyzed Logical Plan == > METER_ID: string, FH: date, SUMMERTIME: string, MAGNITUDE: int, SOURCE: int, > ENTRY_DATETIME: date, BC: string, T_VAL_AE: int, T_VAL_AI: int, T_VAL_R1: > int, T_VAL_R2: int, T_VAL_R3: int, T_VAL_R4: int > Filter (cast(FH#53 as string) > cast(1486726683446150 as string)) > +- > Relation[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure) > == Optimized Logical Plan == > Filter (cast(FH#53 as string) > 2017-02-10 11:38:03.44615) > +- > Relation[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure) > == Physical Plan == > Filter (cast(FH#53 as string) > 2017-02-10 11:38:03.44615) > +- Scan > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure)[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > None > if I set the FH column as timestamp it pushes the filter but throws an > exception: > Caused by: org.apache.phoenix.exception.PhoenixParserException: ERROR 604 > (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "12" at line > 1, column 219. > at > org.apache.phoenix.exception.PhoenixParserException.newException(PhoenixParserException.java:33) > at org.apache.phoenix.parse.SQLParser.parseStatement(SQLParser.java:111) > at > org.apache.phoenix.jdbc.PhoenixStatement$PhoenixStatementParser.parseStatement(PhoenixStatement.java:1280) > at > org.apache.phoenix.jdbc.PhoenixStatement.parseStatement(PhoenixStatement.java:1363) > at > org.apache.phoenix.jdbc.PhoenixStatement.compileQuery(PhoenixStatement.java:1373) > at > org.apache.phoenix.jdbc.PhoenixStatement.optimizeQuery(PhoenixStatement.java:1368) > at > org.apache.phoenix.mapreduce.PhoenixInputFormat.getQueryPlan(PhoenixInputFormat.java:122) > ... 102 more > Caused by: MismatchedTokenException(106!=129) > at > org.apache.phoenix.parse.PhoenixSQLParser.recoverFromMismatchedToken(PhoenixSQLParser.java:360) > at > org.apache.phoenix.shaded.org.antlr.runtime.BaseRecognizer.match(BaseRecognizer.java:115) > at > org.apache.phoenix.parse.PhoenixSQLParser.not_expression(PhoenixSQLParser.java:6862) > at > org.apache.phoenix.parse.PhoenixSQLParser.and_expression(PhoenixSQLParser.java:6677) > at > org.apache.phoenix.parse.PhoenixSQLParser.or_expression(PhoenixSQLParser.java:6614) > at > org.apache.phoenix.parse.PhoenixSQLParser.expression(PhoenixSQLParser.java:6579) > at > org.apache.phoenix.parse.PhoenixSQLParser.single_select(PhoenixSQLParser.java:4615) > at > org.apache.phoenix.parse.PhoenixSQLParser.unioned_selects(PhoenixSQLParser.java:4697) > at >
[jira] [Commented] (PHOENIX-3664) Pyspark: pushing filter by date against apache phoenix
[ https://issues.apache.org/jira/browse/PHOENIX-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866161#comment-15866161 ] Josh Mahonin commented on PHOENIX-3664: --- Hi [~pablo.castellanos] I'm able to reproduce this issue on a local cluster, but I'm unable to do so within the Phoenix unit tests. I think issue is the same as PHOENIX-3540, which is fixed in unreleased Phoenix 4.10. Seeing as you're using a vendor-supplied Phoenix, you may have some success putting in a support request from them for a patched version. As a temporary work around, you could look at using the RDD integration instead. Something like this should work: {code} import org.apache.spark.SparkContext import org.apache.phoenix.spark._ val sv = new java.util.Date val phoenixRDD = sc.phoenixTableAsRDD( table = "PCV2", columns = Seq("METER_ID", "FH", ..., "VAL_R4"), predicate = Some(s"""FH < TO_DATE('${sv.getTime}', 'S')""") zkUrl = Some("10.0.0.11:2181:/hbase-unsecure") ) {code} Note that the 'predicate' value effectively takes a literal string value and passes it directly to Phoenix after a 'WHERE' clause. In this instance it should translate into a query like: {{SELECT METER_ID, FH, ..., VAL_R4 FROM PCV2 WHERE FH < TO_DATE('1487092208672', 'S')}} Please keep this ticket updated with your eventual solution, thanks! > Pyspark: pushing filter by date against apache phoenix > -- > > Key: PHOENIX-3664 > URL: https://issues.apache.org/jira/browse/PHOENIX-3664 > Project: Phoenix > Issue Type: Bug >Affects Versions: 4.7.0 > Environment: Azure HDIndight (HDI 3.5) - pyspark using phoenix > client. (Spark 1.6.3 - HBase 1.1.2 under HDP 2.5) >Reporter: Pablo Castilla > > I am trying to filter by date in apache phoenix from pyspark. The column in > phoenix is created as Date and the filter is a datetime. When I use explain I > see spark doesn't push the filter to phoenix. I have tried a lot of > combinations without luck. > Any way to do it? > df = sqlContext.read \ >.format("org.apache.phoenix.spark") \ > .option("table", "TABLENAME") \ > .option("zkUrl",zookepperServer +":2181:/hbase-unsecure" ) \ > .load() > print(df.printSchema()) > startValidation = datetime.datetime.now() > print(df.filter(df['FH'] >startValidation).explain(True)) > Results: > root > |-- METER_ID: string (nullable = true) > |-- FH: date (nullable = true) > None >== Parsed Logical Plan == > 'Filter (FH#53 > 1486726683446150) > +- > Relation[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure) > == Analyzed Logical Plan == > METER_ID: string, FH: date, SUMMERTIME: string, MAGNITUDE: int, SOURCE: int, > ENTRY_DATETIME: date, BC: string, T_VAL_AE: int, T_VAL_AI: int, T_VAL_R1: > int, T_VAL_R2: int, T_VAL_R3: int, T_VAL_R4: int > Filter (cast(FH#53 as string) > cast(1486726683446150 as string)) > +- > Relation[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure) > == Optimized Logical Plan == > Filter (cast(FH#53 as string) > 2017-02-10 11:38:03.44615) > +- > Relation[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure) > == Physical Plan == > Filter (cast(FH#53 as string) > 2017-02-10 11:38:03.44615) > +- Scan > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure)[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > None > if I set the FH column as timestamp it pushes the filter but throws an > exception: > Caused by: org.apache.phoenix.exception.PhoenixParserException: ERROR 604 > (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "12" at line > 1, column 219. > at > org.apache.phoenix.exception.PhoenixParserException.newException(PhoenixParserException.java:33) > at org.apache.phoenix.parse.SQLParser.parseStatement(SQLParser.java:111) > at > org.apache.phoenix.jdbc.PhoenixStatement$PhoenixStatementParser.parseStatement(PhoenixStatement.java:1280) > at > org.apache.phoenix.jdbc.PhoenixStatement.parseStatement(PhoenixStatement.java:1363) > at > org.apache.phoenix.jdbc.PhoenixStatement.compileQuery(PhoenixStatement.java:1373) > at > org.apache.phoenix.jdbc.PhoenixStatement.optimizeQuery(PhoenixStatement.java:1368) > at >
[jira] [Commented] (PHOENIX-3664) Pyspark: pushing filter by date against apache phoenix
[ https://issues.apache.org/jira/browse/PHOENIX-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866020#comment-15866020 ] Pablo Castilla commented on PHOENIX-3664: - Hi Josh, Thanks for helping. I have tried what you told me and I see errors but different than with python. With the FH set as Date it is pushed to phoenix. The table is created with: "CREATE TABLE IF NOT EXISTS PCV2 (METER_ID VARCHAR(13) not null , FH DATE NOT NULL, SUMMERTIME VARCHAR(1), MAGNITUDE INTEGER, ENTRY_DATETIME DATE, BC VARCHAR(2), VAL_AE INTEGER,VAL_AI INTEGER,VAL_R1 INTEGER,VAL_R2 INTEGER,VAL_R3 INTEGER,VAL_R4 INTEGER CONSTRAINT pk PRIMARY KEY (METER_ID, FH) ) COMPRESSION='GZ' "; The scala code is the following: import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ import java.util.Date val sqlContext = new SQLContext(sc) val df = sqlContext.load( "org.apache.phoenix.spark", Map("table" -> "PCV2", "zkUrl" -> "10.0.0.11:2181:/hbase-unsecure","dateAsTimestamp" -> "true") ) println(df.printSchema()) root |-- METER_ID: string (nullable = true) |-- FH: date (nullable = true) |-- SUMMERTIME: string (nullable = true) |-- MAGNITUDE: integer (nullable = true) |-- ENTRY_DATETIME: date (nullable = true) |-- BC: string (nullable = true) |-- VAL_AE: integer (nullable = true) |-- VAL_AI: integer (nullable = true) |-- VAL_R1: integer (nullable = true) |-- VAL_R2: integer (nullable = true) |-- VAL_R3: integer (nullable = true) |-- VAL_R4: integer (nullable = true) val startValidation = new java.sql.Date(System.currentTimeMillis()) startValidation: java.sql.Date = 2017-02-14 df.filter(df("FH") >startValidation).explain(true) == Physical Plan == Filter (FH#735 > 17211) +- Scan PhoenixRelation(PCV2,10.0.0.11:2181:/hbase-unsecure)[METER_ID#734,FH#735,SUMMERTIME#736,MAGNITUDE#737,ENTRY_DATETIME#738,BC#739,VAL_AE#740,VAL_AI#741,VAL_R1#742,VAL_R2#743,VAL_R3#744,VAL_R4#745] PushedFilters: [GreaterThan(FH,2017-02-14)] df.filter(df("FH") >startValidation).count() Caused by: org.apache.phoenix.schema.TypeMismatchException: ERROR 203 (22005): Type mismatch. DATE and BIGINT for FH > 2001 at org.apache.phoenix.schema.TypeMismatchException.newException(TypeMismatchException.java:53) at org.apache.phoenix.expression.ComparisonExpression.create(ComparisonExpression.java:133) at org.apache.phoenix.compile.ExpressionCompiler.visitLeave(ExpressionCompiler.java:228) at org.apache.phoenix.compile.ExpressionCompiler.visitLeave(ExpressionCompiler.java:141) at org.apache.phoenix.parse.ComparisonParseNode.accept(ComparisonParseNode.java:47) at org.apache.phoenix.compile.WhereCompiler.compile(WhereCompiler.java:130) at org.apache.phoenix.compile.WhereCompiler.compile(WhereCompiler.java:100) at org.apache.phoenix.compile.QueryCompiler.compileSingleFlatQuery(QueryCompiler.java:558) at org.apache.phoenix.compile.QueryCompiler.compileSingleQuery(QueryCompiler.java:510) at org.apache.phoenix.compile.QueryCompiler.compileSelect(QueryCompiler.java:205) at org.apache.phoenix.compile.QueryCompiler.compile(QueryCompiler.java:160) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableSelectStatement.compilePlan(PhoenixStatement.java:404) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableSelectStatement.compilePlan(PhoenixStatement.java:378) at org.apache.phoenix.jdbc.PhoenixStatement.compileQuery(PhoenixStatement.java:1381) at org.apache.phoenix.jdbc.PhoenixStatement.compileQuery(PhoenixStatement.java:1374) at org.apache.phoenix.jdbc.PhoenixStatement.optimizeQuery(PhoenixStatement.java:1368) at org.apache.phoenix.mapreduce.PhoenixInputFormat.getQueryPlan(PhoenixInputFormat.java:122) Also if I set the FH field as Timestamp I get the same error than with python: Caused by: org.apache.phoenix.exception.PhoenixParserException: ERROR 604 (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "15" at line 1, column 53. at org.apache.phoenix.exception.PhoenixParserException.newException(PhoenixParserException.java:33) at org.apache.phoenix.parse.SQLParser.parseStatement(SQLParser.java:111) at org.apache.phoenix.jdbc.PhoenixStatement$PhoenixStatementParser.parseStatement(PhoenixStatement.java:1280) at org.apache.phoenix.jdbc.PhoenixStatement.parseStatement(PhoenixStatement.java:1363) at org.apache.phoenix.jdbc.PhoenixStatement.compileQuery(PhoenixStatement.java:1373) at org.apache.phoenix.jdbc.PhoenixStatement.optimizeQuery(PhoenixStatement.java:1368) at org.apache.phoenix.mapreduce.PhoenixInputFormat.getQueryPlan(PhoenixInputFormat.java:122) ... 180 more Caused by: MismatchedTokenException(106!=129) at
[jira] [Commented] (PHOENIX-3664) Pyspark: pushing filter by date against apache phoenix
[ https://issues.apache.org/jira/browse/PHOENIX-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863720#comment-15863720 ] Josh Mahonin commented on PHOENIX-3664: --- Hi [~pablo.castellanos] I've not seen this before, although I wonder if there's perhaps a few issues at play. 1) Some sort of date translation issue between python datetime, pySpark and phoenix-spark 2) An issue with how Spark treats the 'java.sql.Date' type, and how Phoenix stores it internally Re: 1) Is it possible to attempt a similar code block using Scala in the spark-shell? I think it should be pretty much the same code, just replace {{datetime.datetime.now}} with {{System.currentTimeMillis}} Re: 2) You might have some success passing the 'dateAsTimestamp' flag to Spark. Effectively Spark truncates the HH:MM:SS part of a date off, even though it is present in the Phoenix data type. I wonder if pyspark is doing anything strange with that. https://github.com/apache/phoenix/blob/a0e5efcec5a1a732b2dce9794251242c3d66eea6/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala#L622-L633 > Pyspark: pushing filter by date against apache phoenix > -- > > Key: PHOENIX-3664 > URL: https://issues.apache.org/jira/browse/PHOENIX-3664 > Project: Phoenix > Issue Type: Bug >Affects Versions: 4.7.0 > Environment: Azure HDIndight - pyspark using phoenix client. >Reporter: Pablo Castilla > > I am trying to filter by date in apache phoenix from pyspark. The column in > phoenix is created as Date and the filter is a datetime. When I use explain I > see spark doesn't push the filter to phoenix. I have tried a lot of > combinations without luck. > Any way to do it? > df = sqlContext.read \ >.format("org.apache.phoenix.spark") \ > .option("table", "TABLENAME") \ > .option("zkUrl",zookepperServer +":2181:/hbase-unsecure" ) \ > .load() > print(df.printSchema()) > startValidation = datetime.datetime.now() > print(df.filter(df['FH'] >startValidation).explain(True)) > Results: > root > |-- METER_ID: string (nullable = true) > |-- FH: date (nullable = true) > None >== Parsed Logical Plan == > 'Filter (FH#53 > 1486726683446150) > +- > Relation[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure) > == Analyzed Logical Plan == > METER_ID: string, FH: date, SUMMERTIME: string, MAGNITUDE: int, SOURCE: int, > ENTRY_DATETIME: date, BC: string, T_VAL_AE: int, T_VAL_AI: int, T_VAL_R1: > int, T_VAL_R2: int, T_VAL_R3: int, T_VAL_R4: int > Filter (cast(FH#53 as string) > cast(1486726683446150 as string)) > +- > Relation[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure) > == Optimized Logical Plan == > Filter (cast(FH#53 as string) > 2017-02-10 11:38:03.44615) > +- > Relation[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure) > == Physical Plan == > Filter (cast(FH#53 as string) > 2017-02-10 11:38:03.44615) > +- Scan > PhoenixRelation(DAILYREADS,10.0.0.13:2181:/hbase-unsecure)[METER_ID#52,FH#53,SUMMERTIME#54,MAGNITUDE#55,SOURCE#56,ENTRY_DATETIME#57,BC#58,T_VAL_AE#59,T_VAL_AI#60,T_VAL_R1#61,T_VAL_R2#62,T_VAL_R3#63,T_VAL_R4#64] > None > if I set the FH column as timestamp it pushes the filter but throws an > exception: > Caused by: org.apache.phoenix.exception.PhoenixParserException: ERROR 604 > (42P00): Syntax error. Mismatched input. Expecting "RPAREN", got "12" at line > 1, column 219. > at > org.apache.phoenix.exception.PhoenixParserException.newException(PhoenixParserException.java:33) > at org.apache.phoenix.parse.SQLParser.parseStatement(SQLParser.java:111) > at > org.apache.phoenix.jdbc.PhoenixStatement$PhoenixStatementParser.parseStatement(PhoenixStatement.java:1280) > at > org.apache.phoenix.jdbc.PhoenixStatement.parseStatement(PhoenixStatement.java:1363) > at > org.apache.phoenix.jdbc.PhoenixStatement.compileQuery(PhoenixStatement.java:1373) > at > org.apache.phoenix.jdbc.PhoenixStatement.optimizeQuery(PhoenixStatement.java:1368) > at > org.apache.phoenix.mapreduce.PhoenixInputFormat.getQueryPlan(PhoenixInputFormat.java:122) > ... 102 more > Caused by: MismatchedTokenException(106!=129) > at > org.apache.phoenix.parse.PhoenixSQLParser.recoverFromMismatchedToken(PhoenixSQLParser.java:360) > at > org.apache.phoenix.shaded.org.antlr.runtime.BaseRecognizer.match(BaseRecognizer.java:115) >