GridGain Community Edition

Expand all | Collapse all

Writing Spark Dataframe to Ignite is very slow

  • 1.  Writing Spark Dataframe to Ignite is very slow

    Posted 08-04-2020 07:29 AM
    Hi All,

    I am trying to write spark dataframe to ignite & it is taking around 6 minutes. When I write same parquet file in HDFS via Spark, it finishes in around 30 secs.

    Ignite Server Configuration:

    • JVM Props:-Xms30g -Xmx30g -XX:+AlwaysPreTouch -XX:+UseG1GC -XX:+ScavengeBeforeFullGC -XX:+DisableExplicitG

    • peerClassLoadingEnabled: true
    • dataStorageConfiguration:
      • defaultDataRegionConfiguration:
        • initialSize: 10485760 #10MB
        • persistenceEnabled: true
      • walPath: /tmp/testGridgain/ignite/work/db/wal/
      • walArchivePath: /tmp/testGridgain/ignite/work/db/wal/

     

    Spark Client Configuration:

    • peerClassLoadingEnabled: true
    • clientMode: true

     

    Data attributes:

    Size: 1.4 G
    Record count: 6,382,040

    Any pointers?



    ------------------------------
    Mandar Joshi
    Software Developer
    J
    ------------------------------


  • 2.  RE: Writing Spark Dataframe to Ignite is very slow

    Posted 08-04-2020 05:02 PM
    After few changes, I see that as soon as I enable persistence, the performance degrades. With persistence disabled, data load takes approx 6-7 mins, whereas without persistence it is completed within minutes. Can someone please point to documentation on performance tuning with regards to persistence in Ignite?

    ------------------------------
    Mandar Joshi
    Software Developer
    J
    ------------------------------



  • 3.  RE: Writing Spark Dataframe to Ignite is very slow

    Posted 08-06-2020 04:06 AM
    Hello!

    Please take a look at Persistence Tuning
    Gridgain remove preview
    Persistence Tuning
    Usually, whenever an application reads data from disk, the OS gets the data and puts it in a file buffer cache first. Similarly, for every write operation, the OS first writes the data in the cache and transfers it to disk later.
    View this on Gridgain >

    The most important takeways are:
    • Change WAL mode to LOG_ONLY.
    • Make checkpoints further apart by changing checkpoint frequency and page buffer size.

    Regards,

    ------------------------------
    Ilya Kasnacheev
    Community Support Specialist
    GridGain
    ------------------------------



  • 4.  RE: Writing Spark Dataframe to Ignite is very slow

    Posted 08-06-2020 07:32 AM
    Hi Ilya,

    Thanks for the link. I will check the link to update persistence properties. On side note, I see issue with datatype precision when I am loading parquet file using Spark Dataframe to Ignite. Below are the details. Can you please let me know if this is some bug or I am doing something incorrect?
    I created Ignite Cache with below code.
    val queryEntity = new QueryEntity()
      queryEntity.setKeyType("KeyObj");
      queryEntity.setValueType("ValueObj");
      queryEntity.setTableName("Cache1");
      val javaSet = new java.util.HashSet[String]()
      Set("id", "cob", "start").foreach(v => javaSet.add(v))
      queryEntity.setKeyFields(javaSet)
    
      val javaLinkedHashMap = new util.LinkedHashMap[String, String]()
      Map("id" -> "java.lang.String",
        "cob" -> "java.util.Date",
        "start" -> "java.util.Date",
        "end" -> "java.util.Date",
        "numberField" -> "java.math.BigDecimal").foreach(e => javaLinkedHashMap.put(e._1, e._2))
      queryEntity.setFields(javaLinkedHashMap)
    
      val javaPrecisionMap = new util.HashMap[String, Integer]()
      Map("numberField" -> new Integer(19)).foreach(e => javaPrecisionMap.put(e._1, e._2))
      queryEntity.setFieldsPrecision(javaPrecisionMap)
    
      val javaScaleMap = new util.HashMap[String, Integer]()
      Map("numberField" -> new Integer(0)).foreach(e => javaScaleMap.put(e._1, e._2))
      queryEntity.setFieldsScale(javaScaleMap)
    
      val cache1= new CacheConfiguration[BinaryObject,BinaryObject]()
      val javaQueryEntityList = new util.ArrayList[QueryEntity]()
      javaQueryEntityList.add(queryEntity)
      cache1.setQueryEntities(javaQueryEntityList)
      cache1.setName("Cache1")
      cache1.setCacheMode(CacheMode.PARTITIONED)
      cache1.setAtomicityMode(CacheAtomicityMode.ATOMIC)
      cache1.setSqlSchema("PUBLIC")
    
      val ignite = Ignition.start(Constants.CONFIG)
      ignite.addCacheConfiguration(cache1)

    The source parquet file has field named "numberField" with precision (19,0), but after loading the data in Ignite cache with below code, I see that the precision changes to 38,3. 
    dfFinal.write
        .format(IgniteDataFrameSettings.FORMAT_IGNITE)
        .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, Constants.CONFIG)
        .option(IgniteDataFrameSettings.OPTION_TABLE, "Cache1")
        .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id,cob,start")
        .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS, "template=partitioned")
        .option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE, value = true)
        .option(IgniteDataFrameSettings.OPTION_STREAMER_SKIP_STORE, value = true)
        .mode(SaveMode.Append)
        .save​
    
    #Schema from Source
    root
     |-- id: string (nullable = true)
     |-- cob: date (nullable = true)
     |-- numberField: decimal(19,0) (nullable = true)
     |-- end: timestamp (nullable = true)
     |-- start: timestamp (nullable = false)
    
    
    #Schema from Cache
    root
     |-- NUMBERFIELD: decimal(38,3) (nullable = true)
     |-- END: timestamp (nullable = true)
     |-- ID: string (nullable = false)
     |-- COB: date (nullable = false)
     |-- START: timestamp (nullable = false)
    
    


    ------------------------------
    Mandar Joshi
    Software Developer
    J
    ------------------------------



  • 5.  RE: Writing Spark Dataframe to Ignite is very slow

    Posted 08-06-2020 07:40 AM
    Hello!

    I don't think GridGain has notion of decimal precision: we just store Big Decimal as is, as opposed to traditional SQL databases which store it after precision trim.

    Regards,

    ------------------------------
    Ilya Kasnacheev
    Community Support Specialist
    GridGain
    ------------------------------



  • 6.  RE: Writing Spark Dataframe to Ignite is very slow

    Posted 08-06-2020 07:46 AM
    Hi Ilya,

    I am getting below exception when I am trying to update the Ignite cache with Spark append mode, which makes me think there is some precision / scale checks, which would mean Gridgain would have some way of mandating this precision & Scale. Can you please help in clarifying this?

    Job aborted due to stage failure: Task 1 in stage 6.0 failed 4 times, ....
    
    : class org.apache.ignite.internal.processors.query.IgniteSQLException: Value for a column 'NUMBERFIELD' is out of range. Maximum scale : 0, actual scale: 3
    	at org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl.validateKeyAndValue(QueryTypeDescriptorImpl.java:630)
    	at org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan.processRow(UpdatePlan.java:284)
    	at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamQuery0(IgniteH2Indexing.java:711)
    	at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery(IgniteH2Indexing.java:647)
    	at org.apache.ignite.internal.processors.query.GridQueryProcessor$4.applyx(GridQueryProcessor.java:2491)
    	at org.apache.ignite.internal.processors.query.GridQueryProcessor$4.applyx(GridQueryProcessor.java:2489)
    	at org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
    	at org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:2919)
    	at org.apache.ignite.internal.processors.query.GridQueryProcessor.streamUpdateQuery(GridQueryProcessor.java:2489)
    	at org.apache.ignite.spark.impl.QueryHelper$$anonfun$org$apache$ignite$spark$impl$QueryHelper$$savePartition$6.apply(QueryHelper.scala:194)
    	at org.apache.ignite.spark.impl.QueryHelper$$anonfun$org$apache$ignite$spark$impl$QueryHelper$$savePartition$6.apply(QueryHelper.scala:187)
    	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    	at org.apache.ignite.spark.impl.QueryHelper$.org$apache$ignite$spark$impl$QueryHelper$$savePartition(QueryHelper.scala:187)
    	at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:127)
    	at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:126)
    	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    	at org.apache.spark.scheduler.Task.run(Task.scala:121)
    	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:


    ------------------------------
    Mandar Joshi
    Software Developer
    J
    ------------------------------



  • 7.  RE: Writing Spark Dataframe to Ignite is very slow

    Posted 08-06-2020 08:30 AM
    Hello!

    Yes, I can see now that there is an auxiliary precision check.

    I'm not sure that it will be communicated to Spark, though.

    Regards,

    ------------------------------
    Ilya Kasnacheev
    Community Support Specialist
    GridGain
    ------------------------------



  • 8.  RE: Writing Spark Dataframe to Ignite is very slow

    Posted 08-06-2020 08:52 AM
    Hi Ilya,

    Apologies, but I did not fully understand your response. Are you saying that this is some bug & there is no workaround or am I doing something incorrect?

    ------------------------------
    Mandar Joshi
    Software Developer
    J
    ------------------------------



  • 9.  RE: Writing Spark Dataframe to Ignite is very slow

    Posted 08-06-2020 02:15 PM
    Hi Ilya,

    Just to update, I did find workaround by casting the column when data is read from Ignite. This way when I am doing any operation with that column on the Ignite Dataframe, it has correct precision & scale matching with the original Source. I think the issue is Ignite looses precision & scale during write, which seems like a bug to me.

    Below is sample code with casting. 
    spark.read
        .format(IgniteDataFrameSettings.FORMAT_IGNITE)
        .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, Contants.CONFIG)
        .option(IgniteDataFrameSettings.OPTION_TABLE, "Cache1")
        .load()
        .withColumn("numberField", col("numberField").cast(DecimalType(19,0)))


    ------------------------------
    Mandar Joshi
    Software Developer
    J
    ------------------------------



  • 10.  RE: Writing Spark Dataframe to Ignite is very slow

    Posted 08-07-2020 04:39 AM
    Hello!

    We would be grateful if you could throw together a reproducer project for this issue, share it with us.

    Maybe we will be able to figure what needs to be done about it.

    Regards,

    ------------------------------
    Ilya Kasnacheev
    Community Support Specialist
    GridGain
    ------------------------------