GridGain Community Edition

Expand all | Collapse all

Trying to load data from Spark dataframe into Ignite

  • 1.  Trying to load data from Spark dataframe into Ignite

    Posted 04-15-2020 11:07 PM
    Hello,
    I am not sure if this is the forum to ask questions but I have been struggling to make the below piece of code work and have been trying to make it work for the past couple of weeks. Would be grateful if you could help me resolve the issue. Can you please help? I am having issues when I am trying to write a Spark dataframe into Ignite. I have made the below code work when I  use the jdbc thin client but it was very slow. From the grid gain examples it looks like I dont have to use jdbc thin client but the code below throws an exception.
    ========================================================
    Here is my code - 
    Dataset<Row> priceDataFrame = spark.read()
    .option("header", "true")
    .option("inferSchema","true").csv("d:\\DataSet\\NasdaqPricesHeader.csv");

    priceDataFrame.write()               
    .format(IgniteDataFrameSettings.FORMAT_IGNITE())               
    .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG)                .option(IgniteDataFrameSettings.OPTION_TABLE(), "price2")                .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "symbol,pricedate")                .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=partitioned")                .mode(SaveMode.Overwrite) //Overwriting entire table               
    .save();
    =================================================================================
    Here is the error - 
    [2020-04-16 01:37:46,033][ERROR][Executor task launch worker for task 8][Executor] Exception in task 0.0 in stage 2.0 (TID 8)
    class org.apache.ignite.internal.processors.query.IgniteSQLException: Failed to set schema for DB connection for thread [schema="PUBLIC"] at org.apache.ignite.internal.processors.query.h2.
    H2Connection.schema(H2Connection.java:81) at org.apache.ignite.internal.processors.query.h2.H2PooledConnection.schema(H2PooledConnection.java:60) at org.apache.ignite.internal.processors.query.h2.ConnectionManager.connection(ConnectionManager.java:243) at org.apache.ignite.internal.processors.query.h2.QueryParser.parseH2(QueryParser.java:281) at org.apache.ignite.internal.processors.query.h2.QueryParser.parse0(QueryParser.java:172) at org.apache.ignite.internal.processors.query.h2.QueryParser.parse(QueryParser.java:131) at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamerParse(IgniteH2Indexing.java:737) at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery(IgniteH2Indexing.java:631) at org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2462) at org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2460) at org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:35) at org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:2889) at org.apache.ignite.internal.processors.query.GridQueryProcessor.streamUpdateQuery(GridQueryProcessor.java:2460) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$org$apache$ignite$spark$impl$QueryHelper$$savePartition$5.apply(QueryHelper.scala:177) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$org$apache$ignite$spark$impl$QueryHelper$$savePartition$5.apply(QueryHelper.scala:170) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.ignite.spark.impl.QueryHelper$.org$apache$ignite$spark$impl$QueryHelper$$savePartition(QueryHelper.scala:170) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:117) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:116) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:835)Caused by: org.h2.jdbc.JdbcSQLSyntaxErrorException: Schema """PUBLIC""" not found [90079-199] at org.h2.message.DbException.getJdbcSQLException(DbException.java:574) at org.h2.message.DbException.getJdbcSQLException(DbException.java:427) at org.h2.message.DbException.get(DbException.java:205) at org.h2.message.DbException.get(DbException.java:181) at org.h2.engine.Database.getSchema(Database.java:2015) at org.h2.engine.Session.setCurrentSchemaName(Session.java:1409) at org.h2.jdbc.JdbcConnection.setSchema(JdbcConnection.java:2014) at org.apache.ignite.internal.processors.query.h2.H2Connection.schema(H2Connection.java:78) ... 29 more[2020-04-16 01:37:46,052][ERROR][task-result-getter-0][TaskSetManager] Task 0 in stage 2.0 failed 1 times; aborting jobException in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 8, localhost, executor driver): class org.apache.ignite.internal.processors.query.IgniteSQLException: Failed to set schema for DB connection for thread [schema="PUBLIC"] at org.apache.ignite.internal.processors.query.h2.H2Connection.schema(H2Connection.java:81) at org.apache.ignite.internal.processors.query.h2.H2PooledConnection.schema(H2PooledConnection.java:60) at org.apache.ignite.internal.processors.query.h2.ConnectionManager.connection(ConnectionManager.java:243) at org.apache.ignite.internal.processors.query.h2.QueryParser.parseH2(QueryParser.java:281) at org.apache.ignite.internal.processors.query.h2.QueryParser.parse0(QueryParser.java:172) at org.apache.ignite.internal.processors.query.h2.QueryParser.parse(QueryParser.java:131) at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamerParse(IgniteH2Indexing.java:737) at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery(IgniteH2Indexing.java:631) at org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2462) at org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2460) at org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:35) at org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:2889) at org.apache.ignite.internal.processors.query.GridQueryProcessor.streamUpdateQuery(GridQueryProcessor.java:2460) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$org$apache$ignite$spark$impl$QueryHelper$$savePartition$5.apply(QueryHelper.scala:177) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$org$apache$ignite$spark$impl$QueryHelper$$savePartition$5.apply(QueryHelper.scala:170) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.ignite.spark.impl.QueryHelper$.org$apache$ignite$spark$impl$QueryHelper$$savePartition(QueryHelper.scala:170) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:117) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:116) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:835)Caused by: org.h2.jdbc.JdbcSQLSyntaxErrorException: Schema """PUBLIC""" not found [90079-199] at org.h2.message.DbException.getJdbcSQLException(DbException.java:574) at org.h2.message.DbException.getJdbcSQLException(DbException.java:427) at org.h2.message.DbException.get(DbException.java:205) at org.h2.message.DbException.get(DbException.java:181) at org.h2.engine.Database.getSchema(Database.java:2015) at org.h2.engine.Session.setCurrentSchemaName(Session.java:1409) at org.h2.jdbc.JdbcConnection.setSchema(JdbcConnection.java:2014) at org.apache.ignite.internal.processors.query.h2.H2Connection.schema(H2Connection.java:78) ... 29 more
    Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924) at org.apache.ignite.spark.impl.QueryHelper$.saveTable(QueryHelper.scala:116) at org.apache.ignite.spark.impl.IgniteRelationProvider.createRelation(IgniteRelationProvider.scala:159) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233) at com.data.SparkLoad.nativeSparkSqlExample(SparkLoad.java:114) at com.data.SparkLoad.main(SparkLoad.java:73)Caused by: class org.apache.ignite.internal.processors.query.IgniteSQLException: Failed to set schema for DB connection for thread [schema="PUBLIC"] at org.apache.ignite.internal.processors.query.h2.H2Connection.schema(H2Connection.java:81) at org.apache.ignite.internal.processors.query.h2.H2PooledConnection.schema(H2PooledConnection.java:60) at org.apache.ignite.internal.processors.query.h2.ConnectionManager.connection(ConnectionManager.java:243) at org.apache.ignite.internal.processors.query.h2.QueryParser.parseH2(QueryParser.java:281) at org.apache.ignite.internal.processors.query.h2.QueryParser.parse0(QueryParser.java:172) at org.apache.ignite.internal.processors.query.h2.QueryParser.parse(QueryParser.java:131) at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamerParse(IgniteH2Indexing.java:737) at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery(IgniteH2Indexing.java:631) at org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2462) at org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2460) at org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:35) at org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:2889) at org.apache.ignite.internal.processors.query.GridQueryProcessor.streamUpdateQuery(GridQueryProcessor.java:2460) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$org$apache$ignite$spark$impl$QueryHelper$$savePartition$5.apply(QueryHelper.scala:177) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$org$apache$ignite$spark$impl$QueryHelper$$savePartition$5.apply(QueryHelper.scala:170) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.ignite.spark.impl.QueryHelper$.org$apache$ignite$spark$impl$QueryHelper$$savePartition(QueryHelper.scala:170) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:117) at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:116) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:835)Caused by: org.h2.jdbc.JdbcSQLSyntaxErrorException: Schema """PUBLIC""" not found [90079-199] at org.h2.message.DbException.getJdbcSQLException(DbException.java:574) at org.h2.message.DbException.getJdbcSQLException(DbException.java:427) at org.h2.message.DbException.get(DbException.java:205) at org.h2.message.DbException.get(DbException.java:181) at org.h2.engine.Database.getSchema(Database.java:2015) at org.h2.engine.Session.setCurrentSchemaName(Session.java:1409) at org.h2.jdbc.JdbcConnection.setSchema(JdbcConnection.java:2014) at org.apache.ignite.internal.processors.query.h2.H2Connection.schema(H2Connection.java:78) ... 29 more

    ------------------------------
    sharat chelluboina
    Director
    ------------------------------


  • 2.  RE: Trying to load data from Spark dataframe into Ignite

    Posted 04-17-2020 03:43 AM

    Hi,

    Looks like your table has the schema that different from PUBLIC used but default:

    class org.apache.ignite.internal.processors.query.IgniteSQLException: Failed to set schema for DB connection for thread [schema="PUBLIC"] at org.apache.ignite.internal.processors.query.h2.
    H2Connection.schema(H2Connection.java:81) at

    What you can do:

    1)Check your cache configuration and set there PUBLIC schema manually:

    <property name="cacheConfiguration">
       <list>
       <bean class="org.apache.ignite.configuration.CacheConfiguration">
          <property name="name" value="Person" />
          <property name="cacheMode" value="PARTITIONED" />
          <property name="atomicityMode" value="ATOMIC" />
          <property name="sqlSchema" value="PUBLIC" />
    ...

    After that, you can use the Spark Dataframes without schema because the PUBLIC will be used by default.

    NOTE: if you didn't add sqlSchema property then schema will be like a table name.

    2)You can set the schema in Spark Dataframes properties:

    .option(IgniteDataFrameSettings.OPTION_SCHEMA(), "SchemaName")

    But it looks like that some problems exist with custom schemes:

    https://issues.apache.org/jira/browse/IGNITE-12141

    However, your code should work fine with PUBLIC schema.

    BR,
    Andrei



    ------------------------------
    Andrei Alexsandrov
    Developer
    GridGain
    ------------------------------



  • 3.  RE: Trying to load data from Spark dataframe into Ignite

    Posted 04-20-2020 10:23 AM
    Andrei,
    Thank you so much for responding to my question.
    I have resolved the issue. I had an old Ignite version 2.3 that I upgraded to 2.8 to resolve the issue.
    Thanks again.

    Regards,
    Sharat





  • 4.  RE: Trying to load data from Spark dataframe into Ignite

    Posted 04-17-2020 06:35 AM
    This is resolved by using Ignite 2.8 within Grid Gain package.

    ------------------------------
    sharat chelluboina
    Director
    abc.com
    2035543544
    ------------------------------