Zonal statistics computation on PROBA-V MEP

This Scala notebook demonstrates the combination of a few technologies in the PROBA-V Mission Exploitation Platform:

  1. Geotrellis for fast access to Copernicus data
  2. Use of Spark to speed up distributed processing on raster tiles
  3. Use of scala Bokeh bindings to plot results
  4. Loading auxiliary data from HDFS using Geotrellis

Initialization

First of all, we need to do some setup. Because our Scala notebook kernel does not integrate Spark directly, we need to do this ourselves. Other notebooks technologies like Apache Zeppelin do provide such a direct integration, but the setup code is fairly simple and can easily be reused in other notebooks.

Note how we currently run Spark in local mode. Using yarn-client should also work, but there we have a mismatch between versions of specific libraries on the cluster and locally. This will need to be sorted out if we want to scale out this experiment.

In [1]:
val sparkHome = "/home/driesj/spark-1.6.2-bin-without-hadoop"
val hadoopConfDir = "/etc/hadoop/conf"
val sparkAssembly = "hdfs:///user/driesj/spark-assembly-1.6.2-hadoop2.2.0.jar"
val sparkMaster = "local[4]"
val sparkVersion = "1.6.2"
val hadoopVersion = "2.7.1"
val scalaVersion = scala.util.Properties.versionNumberString
sparkHome: String = "/home/driesj/spark-1.6.2-bin-without-hadoop"
hadoopConfDir: String = "/etc/hadoop/conf"
sparkAssembly: String = "hdfs:///user/driesj/spark-assembly-1.6.2-hadoop2.2.0.jar"
sparkMaster: String = "local[4]"
sparkVersion: String = "1.6.2"
hadoopVersion: String = "2.7.1"
scalaVersion: String = "2.10.6"

A very nice feature of the Scala notebook kernel is that we can simply load dependencies from Maven repositories. This also works with Spark, so no need to preinstall anything on the cluster or on the notebook server.

In [2]:
classpath.addPath(hadoopConfDir)

classpath.add(
  
  "com.azavea.geotrellis" %% "geotrellis-spark" % "0.10.1",
  "com.azavea.geotrellis" %% "geotrellis-accumulo" % "0.10.1",
  "com.github.alexarchambault.ammonium" % s"spark_1.6.1_${scalaVersion}" % "0.4.0-M6-1",
  "org.apache.spark" %% "spark-yarn" % sparkVersion,
  "org.apache.hadoop" % "hadoop-client" % hadoopVersion,
  "org.apache.hadoop" % "hadoop-yarn-server-web-proxy" % hadoopVersion
)
classpath.add("io.continuum.bokeh" %% "bokeh" % "0.7")
192 new artifact(s)
193 new artifacts in macro
193 new artifacts in runtime
193 new artifacts in compile
18 new artifact(s)
19 new artifacts in macro
19 new artifacts in runtime
19 new artifacts in compile

In [3]:
//we have a dependency issue with Avro versions when running in yarn-client mode, this line shows what version is 
//on the classpath
classpath.path().filter(f=> f.getName.contains("avro"))
res2: Seq[java.io.File] = List(
  /home/driesj/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/avro/avro/1.8.0/avro-1.8.0.jar,
  /home/driesj/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/avro/avro-ipc/1.7.7/avro-ipc-1.7.7.jar,
  /home/driesj/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/avro/avro-ipc/1.7.7/avro-ipc-1.7.7-tests.jar,
  /home/driesj/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/avro/avro-mapred/1.7.7/avro-mapred-1.7.7-hadoop2.jar
)

Authentication

The PROBA-V MEP Hadoop cluster is secured with Kerberos. This applies to using components such as Accumulo, HDFS and YARN. To authenticate, we need to run kinit and provide our password. The current trick is to start a process from Scala. Luckily, we only need to do this once (until the Kerberos ticket expires), so make sure to remove your password after authentication.

In [4]:
import scala.sys.process._
("echo xxx" #| "kinit") .!!
kinit: Password incorrect while getting initial credentials
java.lang.RuntimeException: Nonzero exit value: 1 (Nonzero exit value: 1)
  scala.sys.package$.error(package.scala:27)
  scala.sys.process.ProcessBuilderImpl$AbstractBuilder.slurp(ProcessBuilderImpl.scala:131)
  scala.sys.process.ProcessBuilderImpl$AbstractBuilder.$bang$bang(ProcessBuilderImpl.scala:101)
  cmd3$$user$$anonfun$1.apply(Main.scala:24)
  cmd3$$user$$anonfun$1.apply(Main.scala:23)

Spark initialization

We use Ammonite to provide Spark integration for our notebook. We do have to override some settings, because Ammonite assumes that Spark jars are available on the cluster, which is not the case if we want to be able to use a different version. Currently, the version on the PROBA-V MEP Hadoop cluster is 1.4.1, while we are using 1.6.2.

In [5]:
val Spark = new ammonite.spark.Spark
Spark: ammonite.spark.Spark = Spark(uninitialized)
In [6]:
import Spark.{ sparkConf, sc, sqlContext }
import Spark.{ sparkConf, sc, sqlContext }
In [7]:
//Workaround for https://github.com/geotrellis/geotrellis/issues/1445
//which only occurs when running on the cluster
//exclude avro 1.8.0
private lazy val sparkJars = classpath.resolve(
 //   "org.apache.spark" %% "spark-core" % sparkVersion,
 //   "org.apache.spark" %% "spark-sql" % sparkVersion//,
    "org.apache.avro" % "avro" % "1.8.0"
  ).toSet

//include avro 1.7.7
private lazy val avroJars = classpath.resolve(
    "org.apache.avro" % "avro" % "1.7.7"
  ).toSet
//print(avroJars)

import org.apache.spark.SparkConf

//copy pasted from ammonite spark conf init code
implicit class SparkConfExtensions(val conf: SparkConf) {
      def setIfMissingLazy(key: String, value: => String): conf.type = {
        if (conf.getOption(key).isEmpty)
          conf.set(key, value)
        conf
      }
    }

sparkConf
  .setMaster(sparkMaster)
  .setAppName("CropscapeDemo")
  .set("spark.executor.memory", "8g")
  .set("spark.executor.instances", "4")
  .set("spark.yarn.am.extraJavaOptions", "-Dhdp.version=2.7.1")
  .set("spark.home", sparkHome)
  .set("spark.yarn.jar", sparkAssembly)
  //.set("spark.jars",null)
  .set(
        "spark.jars",
        (classpath
          .path()          
          .filter(f => f.isFile && f.getName.endsWith(".jar"))
          .filterNot(sparkJars)
          ++ avroJars
        ) 
          .map(_.toURI.toString)
          .mkString(",")
      )
import org.apache.spark.SparkConf
defined class SparkConfExtensions
res6_4: org.apache.spark.SparkConf = org.apache.spark.SparkConf@3848d03f

This completes the setup of our notebook so we can start using Spark and Geotrellis.

Computing Zonal Statistics

The aim of this notebook is to compute a timeseries of biophysical statistics for various crops in a particular region. The biophysical parameter data is provided by the Copernicus Global Land FAPAR product (http://land.copernicus.eu/global/products/fapar). The crop data will be provided by cropscape (https://nassgeodata.gmu.edu/CropScape/), which provides a yearly land use map for the United States. We'll use an extract of this dataset for the year 2015, in Iowa, because this is a region where the parcels are fairly large, which is a good match for our Copernicus data at 1km resolution.

To run in local mode, we use a fairly small area of interest, when running on the cluster, this can be larger, but it only makes sense to aggregate statistics over areas with similar characteristics.

The crop data is provided as a Geotiff on HDFS, which can be read directly by Geotrellis from my home directory. The full Copernicus fAPAR V1 timeseries has already been ingested into Accumulo, which is a Key-Value store, which stores it's data on HDFS. This reduces network overhead when processing this data on the cluster.

In [8]:
import com.github.nscala_time.time.Imports._
import geotrellis.raster.Tile
import geotrellis.raster.resample.{Bilinear, CubicConvolution, NearestNeighbor,Mode}
import geotrellis.spark.io.{Between, Intersects, LayerQuery}
import geotrellis.spark.io.accumulo.AccumuloLayerReader
import geotrellis.spark.{Bounds, KeyBounds, LayerId, Metadata, SpaceTimeKey, SpatialKey, TileLayerMetadata}
import geotrellis.vector.{Extent, ProjectedExtent}
import org.joda.time.DateTime
import geotrellis.spark.io.hadoop._
import geotrellis.spark.tiling._
import geotrellis.spark.tiling.FloatingLayoutScheme
import geotrellis.spark.io.accumulo.{AccumuloInstance, AccumuloLayerReader, AccumuloAttributeStore}
import org.apache.accumulo.core.client.security.tokens.KerberosToken
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

import geotrellis.spark.io.json.Implicits.tileLayerMetadataFormat
import geotrellis.spark.io.json.KeyFormats.SpaceTimeKeyFormat

/**
* Loads a mask containing zones from HDFS
**/
def getMask(sc: SparkContext): RDD[(ProjectedExtent, Tile)] =
  {
    sc.hadoopGeoTiffRDD("hdfs:///user/driesj/cropscapedata/CDL_2015.tif")
      
  }

def getLayerIds(): Seq[LayerId] = {
    val token = new KerberosToken()
    val accumulo = AccumuloInstance("hdp-accumulo-instance", "epod6.vgt.vito.be:2181,epod17.vgt.vito.be:2181,epod1.vgt.vito.be:2181", token.getPrincipal, token)
    return AccumuloAttributeStore(accumulo).layerIds
}

/**
*   Load an RDD backed by accumulo, some parameters are still hard coded
**/
def loadBiopar(parameterName: String,extent:Extent,sc:SparkContext) :RDD[(SpaceTimeKey,Tile)] with Metadata[TileLayerMetadata[SpaceTimeKey]] = {
    
    print("acquiring reader...")
    val token = new KerberosToken()
    val accumulo = AccumuloInstance("hdp-accumulo-instance", "epod6.vgt.vito.be:2181,epod17.vgt.vito.be:2181,epod1.vgt.vito.be:2181", token.getPrincipal, token)
    val reader = AccumuloLayerReader(accumulo)(sc)
    print("acquired reader: " + reader)
    reader.attributeStore.layerIds
    val layerId: LayerId = new LayerId(parameterName, 0)
    val query = new LayerQuery[SpaceTimeKey, TileLayerMetadata[SpaceTimeKey]].where(Intersects(extent))
      .where(Between(new DateTime(2015,1,1,0,0,0, DateTimeZone.UTC),new DateTime(2015,12,12,0,0,0, DateTimeZone.UTC)))

    reader.read[SpaceTimeKey, Tile, TileLayerMetadata[SpaceTimeKey]](layerId,query)

  }
import com.github.nscala_time.time.Imports._
import geotrellis.raster.Tile
import geotrellis.raster.resample.{Bilinear, CubicConvolution, NearestNeighbor,Mode}
import geotrellis.spark.io.{Between, Intersects, LayerQuery}
import geotrellis.spark.io.accumulo.AccumuloLayerReader
import geotrellis.spark.{Bounds, KeyBounds, LayerId, Metadata, SpaceTimeKey, SpatialKey, TileLayerMetadata}
import geotrellis.vector.{Extent, ProjectedExtent}
import org.joda.time.DateTime
import geotrellis.spark.io.hadoop._
import geotrellis.spark.tiling._
import geotrellis.spark.tiling.FloatingLayoutScheme
import geotrellis.spark.io.accumulo.{AccumuloInstance, AccumuloLayerReader, AccumuloAttributeStore}
import org.apache.accumulo.core.client.security.tokens.KerberosToken
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import geotrellis.spark.io.json.Implicits.tileLayerMetadataFormat
import geotrellis.spark.io.json.KeyFormats.SpaceTimeKeyFormat
defined function getMask
defined function getLayerIds
defined function loadBiopar
In [9]:
//some utility functions to write an RDD to a Geotiff, for debugging purposes.
import geotrellis.raster.{io=>_,_}
import geotrellis.raster.io.geotiff._
import geotrellis.spark.stitch._
import geotrellis.spark.{io=>_,_}

def toFile(filename:String,rdd:TileLayerRDD[SpatialKey] ) =
{
    val raster: Raster[Tile] = rdd.stitch
    GeoTiff(raster, rdd.metadata.crs).write(filename)    
}

def toFile2(filename:String,rdd:RDD[(SpatialKey,Tile)] with Metadata[TileLayerMetadata[SpaceTimeKey]]) =
{
    val raster: Raster[Tile] = rdd.stitch
    GeoTiff(raster, rdd.metadata.crs).write(filename)    
}
import geotrellis.raster.{io=>_,_}
import geotrellis.raster.io.geotiff._
import geotrellis.spark.stitch._
import geotrellis.spark.{io=>_,_}
defined function toFile
defined function toFile2

Loading the RDDs

Now we can load our mask RDD and extract it's metadata. The metadata contains the extent, which is used to create an RDD that contains the fAPAR tiles that overlap with our mask.

In [10]:
val maskRDD: RDD[(ProjectedExtent, Tile)] = getMask(sc)

val layoutScheme = ZoomedLayoutScheme(maskRDD.first._1.crs ,256,0.1)

val (_: Int, metadata: TileLayerMetadata[SpatialKey]) = TileLayerMetadata.fromRdd(maskRDD, layoutScheme)
maskRDD: org.apache.spark.rdd.RDD[(geotrellis.vector.ProjectedExtent, geotrellis.raster.Tile)] = NewHadoopRDD[0] at newAPIHadoopRDD at HadoopSparkContextMethods.scala:34
layoutScheme: geotrellis.spark.tiling.ZoomedLayoutScheme = geotrellis.spark.tiling.ZoomedLayoutScheme@650025ac
metadata: geotrellis.spark.TileLayerMetadata[geotrellis.spark.SpatialKey] = TileLayerMetadata(uint8raw,LayoutDefinition(Extent(-180.0, -89.99999, 179.99999, 89.99999),TileLayout(8192,8192,256,256)),Extent(-92.955516, 41.612908, -92.835898, 41.695655),geotrellis.proj4.CRS$$anon$3@41d0d1b7,KeyBounds(SpatialKey(1980,2198),SpatialKey(1983,2202)))
In [11]:
print(getLayerIds)
val bioParRDD: RDD[(SpaceTimeKey, Tile)] with Metadata[TileLayerMetadata[SpaceTimeKey]] = loadBiopar("BioPar_FAPAR_V1_Tiles", metadata.extent,sc)
List(Layer(name = "BioPar_FAPAR_V1_Tiles", zoom = 0), Layer(name = "PROBAV_L3_S10_TOC_NDVI_333M", zoom = 0), Layer(name = "PROBAV_L3_S10_TOC_RED_333M", zoom = 0))acquiring reader...acquired reader: geotrellis.spark.io.accumulo.AccumuloLayerReader@77ee042
bioParRDD: org.apache.spark.rdd.RDD[(geotrellis.spark.SpaceTimeKey, geotrellis.raster.Tile)] with geotrellis.spark.Metadata[geotrellis.spark.TileLayerMetadata[geotrellis.spark.SpaceTimeKey]] = ContextRDD[5] at RDD at ContextRDD.scala:19
In [ ]:

In [12]:
val bounds: Bounds[SpaceTimeKey] = bioParRDD.metadata.bounds
bounds: geotrellis.spark.Bounds[geotrellis.spark.SpaceTimeKey] = KeyBounds(
  SpaceTimeKey(0, 0, 914457600000L),
  SpaceTimeKey(78, 32, 1466640000000L)
)

Reformatting our mask

The mask data has a higher resolution than the Copernicus data, and has a different layout. Therefore, we have to resample and retile our mask, so it matches exactly with the Copernicus data. A different approach could be resample the Copernicus data itself, which may be better because it avoids loosing resolution in the mask data.

In [13]:
import org.apache.spark.HashPartitioner
// Here we set some options for our tiling.
val tilerOptions =
Tiler.Options(
  resampleMethod = Mode,
  partitioner = new HashPartitioner(10)
)
val tiledRdd = maskRDD.tileToLayout[SpatialKey](new UByteUserDefinedNoDataCellType(0),bioParRDD.metadata.layout, tilerOptions)
import org.apache.spark.HashPartitioner
tilerOptions: geotrellis.spark.tiling.Tiler.Options = Options(Mode, Some(org.apache.spark.HashPartitioner@a))
tiledRdd: org.apache.spark.rdd.RDD[(geotrellis.spark.SpatialKey, geotrellis.raster.Tile)] = ShuffledRDD[7] at reduceByKey at TileRDDMerge.scala:32
In [14]:
//val retiledBiopar = bioParRDD.tileToLayout[SpatialKey](metadata.cellType,metadata.layout, tilerOptions)
//toFile2("fapar2.tiff",retiledBiopar.toSpatial(1420243200000L))

Run zonalstats for each time instant

The Copernicus RDD is in fact a timeseries. Runnig the Geotrellis zonalHistogram method on it would aggregate all histograms over all dates, which is not what we want. Therefore, we simply loop over the available time instants, and then run zonalHistogram on an RDD which is filtered on that instant. This approach may have to be revised for scalability at some point.

Determine the distinct timestamps in the BioPar timeseries RDD. The timestamp of a Copernicus product in fact indicates a ten daily period, and different conventions are in use, so it's best to extract the actual timestamps from our RDD. The methods to do this are provided by the Spark API, so all of this can happen in a distributed manner.

In [15]:
val unique: RDD[TemporalKey] = bioParRDD.keys.map(k => k.temporalKey).distinct().cache()
val timeKeys = unique.collect.sortWith(_.instant < _.instant)
unique: org.apache.spark.rdd.RDD[geotrellis.spark.TemporalKey] = MapPartitionsRDD[12] at distinct at Main.scala:157
timeKeys: Array[geotrellis.spark.TemporalKey] = Array(
  TemporalKey(1420243200000L),
  TemporalKey(1421107200000L),
  TemporalKey(1422057600000L),
  TemporalKey(1422921600000L),
  TemporalKey(1423785600000L),
  TemporalKey(1424476800000L),
  TemporalKey(1425340800000L),
  TemporalKey(1426204800000L),
  TemporalKey(1427155200000L),
  TemporalKey(1428019200000L),
  TemporalKey(1428883200000L),
  TemporalKey(1429747200000L),
  TemporalKey(1430611200000L),
  TemporalKey(1431475200000L),
  TemporalKey(1432425600000L),
  TemporalKey(1433289600000L),
  TemporalKey(1434153600000L),
  TemporalKey(1435017600000L),
  TemporalKey(1435881600000L),
...
In [16]:
// At this point, we want to combine our RDD and our Metadata to get a TileLayerRDD[SpatialKey]
val tiledMaskRdd: TileLayerRDD[SpatialKey] = ContextRDD(tiledRdd, metadata)
toFile("mask.tiff",tiledMaskRdd )
tiledMaskRdd: geotrellis.spark.TileLayerRDD[geotrellis.spark.SpatialKey] = ContextRDD[13] at RDD at ContextRDD.scala:19
In [37]:
import geotrellis.raster.histogram._
import geotrellis.spark.mapalgebra.zonal._

def computeMeanValues(timeseriesRDD: RDD[(SpaceTimeKey, Tile)] with Metadata[TileLayerMetadata[SpaceTimeKey]],zonesRDD:TileLayerRDD[SpatialKey],temporalKeys:Seq[TemporalKey],zoneId:Int = 5): (List[Double],Map[Long,Histogram[Int]])={
    var meanValues = List[Double]()
    var histograms = new scala.collection.mutable.HashMap[Long, Histogram[Int]]
    for( key <- temporalKeys ) {
        val spatialRDD = timeseriesRDD.toSpatial(key.instant )
        
        val histogram: Map[Int, Histogram[Int]] = spatialRDD.zonalHistogram(zonesRDD)
        val histo : MutableHistogram[Int] = histogram.get(zoneId).toList(0).mutable()
        histograms(key.instant) =  histo
        //it appears that geotrellis does not filter out the nodata value when computing the mean stats for a histogram
        //that's why I filter it out manually, normal fAPAR values are all > 0.0
        if(!histo.minValue.isEmpty && histo.minValue.get < 0.0){
            histo.uncountItem(histo.minValue.get )
        }
        var mean = 0.0
        if(!histo.statistics.isEmpty){
            mean = histo.statistics.get.mean/250
        }

        //filter out negative means
        if(mean < 0 ){
            meanValues = 0. :: meanValues
        }else{
            meanValues = mean :: meanValues 
        }

    }
    (meanValues,histograms.toMap)
}
import geotrellis.raster.histogram._
import geotrellis.spark.mapalgebra.zonal._
defined function computeMeanValues
In [45]:
//compute mean for fAPAR
val (meanValues,soyFaparHistograms) = computeMeanValues(bioParRDD,tiledMaskRdd,timeKeys )
meanValues: List[Double] = List(
  0.09469767441860465,
  0.13888372093023255,
  0.1735813953488372,
  0.16353488372093022,
  0.19088372093023254,
  0.2952558139534883,
  0.33293023255813964,
  0.5071627906976743,
  0.6929302325581396,
  0.7933953488372093,
  0.8006486486486485,
  0.7677894736842104,
  0.7764,
  0.7892727272727273,
  0.8027142857142858,
  0.8319069767441859,
  0.6944186046511628,
  0.5801860465116279,
  0.3759069767441861,
...
soyFaparHistograms: Map[Long,geotrellis.raster.histogram.Histogram[Int]] = Map(
  1445644800000L -> geotrellis.raster.histogram.FastMapHistogram@e1787fe,
  1442966400000L -> geotrellis.raster.histogram.FastMapHistogram@1389f144,
  1441238400000L -> geotrellis.raster.histogram.FastMapHistogram@6a8af421,
  1429747200000L -> geotrellis.raster.histogram.FastMapHistogram@6722143e,
  1443830400000L -> geotrellis.raster.histogram.FastMapHistogram@55384753,
  1432425600000L -> geotrellis.raster.histogram.FastMapHistogram@9679f61,
  1422057600000L -> geotrellis.raster.histogram.FastMapHistogram@34f14990,
  1446508800000L -> geotrellis.raster.histogram.FastMapHistogram@430538d4,
  1428883200000L -> geotrellis.raster.histogram.FastMapHistogram@5490fd38,
  1433289600000L -> geotrellis.raster.histogram.FastMapHistogram@6139fc5c,
  1447372800000L -> geotrellis.raster.histogram.FastMapHistogram@5fba8422,
  1449100800000L -> geotrellis.raster.histogram.FastMapHistogram@3593d499,
  1440374400000L -> geotrellis.raster.histogram.FastMapHistogram@6639862e,
  1420243200000L -> geotrellis.raster.histogram.FastMapHistogram@484c4da9,
  1435017600000L -> geotrellis.raster.histogram.FastMapHistogram@435c5b16,
  1422921600000L -> geotrellis.raster.histogram.FastMapHistogram@684078f8,
  1423785600000L -> geotrellis.raster.histogram.FastMapHistogram@42e28b28,
  1428019200000L -> geotrellis.raster.histogram.FastMapHistogram@6869b7d8,
  1442102400000L -> geotrellis.raster.histogram.FastMapHistogram@494cd528,
...
In [19]:
val meanCornFaparValues = computeMeanValues(bioParRDD,tiledMaskRdd,timeKeys,1 )._1
meanCornFaparValues: List[Double] = List(
  0.10207999999999999,
  0.13584615384615384,
  0.17238461538461536,
  0.1652307692307693,
  0.19361538461538463,
  0.30030769230769233,
  0.3372307692307693,
  0.5040769230769232,
  0.6888461538461539,
  0.7894615384615384,
  0.8086222222222221,
  0.7864347826086958,
  0.7875555555555556,
  0.7995384615384614,
  0.812125,
  0.8351372549019609,
  0.709153846153846,
  0.589692307692308,
  0.3968461538461539,
...

Now that everything's working, we can apply the same code to PROBA-V 300M products, which have a higher resolution.

In [20]:
//let's repeat everything, but now for PROBA-V 300M NDVI
import org.apache.spark.HashPartitioner
// Here we set some options for our tiling.
val tilerOptions =
Tiler.Options(
  resampleMethod = Mode,
  partitioner = new HashPartitioner(10)
)
val ndviRDD: RDD[(SpaceTimeKey, Tile)] with Metadata[TileLayerMetadata[SpaceTimeKey]] = loadBiopar("PROBAV_L3_S10_TOC_NDVI_333M", metadata.extent,sc)
val maskRddNDVI = maskRDD.tileToLayout[SpatialKey](new UByteUserDefinedNoDataCellType(0),ndviRDD.metadata.layout, tilerOptions)
acquiring reader...acquired reader: geotrellis.spark.io.accumulo.AccumuloLayerReader@5f754616
import org.apache.spark.HashPartitioner
tilerOptions: geotrellis.spark.tiling.Tiler.Options = Options(Mode, Some(org.apache.spark.HashPartitioner@a))
ndviRDD: org.apache.spark.rdd.RDD[(geotrellis.spark.SpaceTimeKey, geotrellis.raster.Tile)] with geotrellis.spark.Metadata[geotrellis.spark.TileLayerMetadata[geotrellis.spark.SpaceTimeKey]] = ContextRDD[425] at RDD at ContextRDD.scala:19
maskRddNDVI: org.apache.spark.rdd.RDD[(geotrellis.spark.SpatialKey, geotrellis.raster.Tile)] = ShuffledRDD[427] at reduceByKey at TileRDDMerge.scala:32
In [21]:
val ndviTemporalRDD: RDD[TemporalKey] = ndviRDD.keys.map(k => k.temporalKey).distinct().cache()
val timeKeysNDVI = ndviTemporalRDD.collect.sortWith(_.instant < _.instant)
ndviTemporalRDD: org.apache.spark.rdd.RDD[geotrellis.spark.TemporalKey] = MapPartitionsRDD[432] at distinct at Main.scala:157
timeKeysNDVI: Array[geotrellis.spark.TemporalKey] = Array(
  TemporalKey(1420070400000L),
  TemporalKey(1420934400000L),
  TemporalKey(1421798400000L),
  TemporalKey(1422748800000L),
  TemporalKey(1423612800000L),
  TemporalKey(1424476800000L),
  TemporalKey(1425168000000L),
  TemporalKey(1426032000000L),
  TemporalKey(1426896000000L),
  TemporalKey(1427846400000L),
  TemporalKey(1428710400000L),
  TemporalKey(1429574400000L),
  TemporalKey(1430438400000L),
  TemporalKey(1431302400000L),
  TemporalKey(1432166400000L),
  TemporalKey(1433116800000L),
  TemporalKey(1433980800000L),
  TemporalKey(1434844800000L),
  TemporalKey(1435708800000L),
...
In [40]:
val (meanNDVIValues,soyNDVIHistograms) = computeMeanValues(ndviRDD,ContextRDD(maskRddNDVI,metadata),timeKeysNDVI)
meanNDVIValues: List[Double] = List(
  0.31273684210526315,
  0.3178736842105263,
  0.11147368421052631,
  0.3455368421052633,
  0.33037894736842094,
  0.3445894736842106,
  0.3727157894736842,
  0.42818947368421056,
  0.4622315789473685,
  0.7758947368421054,
  0.9369894736842105,
  0.9887157894736841,
  0.9826736842105263,
  0.9818947368421052,
  0.9956421052631579,
  0.9858315789473685,
  0.8720842105263156,
  0.6136000000000001,
  0.7593684210526316,
...
soyNDVIHistograms: Map[Long,geotrellis.raster.histogram.Histogram[Int]] = Map(
  1422748800000L -> geotrellis.raster.histogram.FastMapHistogram@5ed84094,
  1448064000000L -> geotrellis.raster.histogram.FastMapHistogram@6e36f43b,
  1428710400000L -> geotrellis.raster.histogram.FastMapHistogram@434e1369,
  1427846400000L -> geotrellis.raster.histogram.FastMapHistogram@1ea5d859,
  1432166400000L -> geotrellis.raster.histogram.FastMapHistogram@265e7f9,
  1423612800000L -> geotrellis.raster.histogram.FastMapHistogram@5b2b5ca2,
  1431302400000L -> geotrellis.raster.histogram.FastMapHistogram@684dc9f5,
  1443657600000L -> geotrellis.raster.histogram.FastMapHistogram@581ab3b5,
  1425168000000L -> geotrellis.raster.histogram.FastMapHistogram@756dcd60,
  1435708800000L -> geotrellis.raster.histogram.FastMapHistogram@74224123,
  1449792000000L -> geotrellis.raster.histogram.FastMapHistogram@72263c33,
  1436572800000L -> geotrellis.raster.histogram.FastMapHistogram@7f2cbed9,
  1438387200000L -> geotrellis.raster.histogram.FastMapHistogram@64317a91,
  1434844800000L -> geotrellis.raster.histogram.FastMapHistogram@2fc046d,
  1445385600000L -> geotrellis.raster.histogram.FastMapHistogram@41f877a5,
  1444521600000L -> geotrellis.raster.histogram.FastMapHistogram@5b38d335,
  1426032000000L -> geotrellis.raster.histogram.FastMapHistogram@3432cf31,
  1448928000000L -> geotrellis.raster.histogram.FastMapHistogram@568f6ab6,
  1433980800000L -> geotrellis.raster.histogram.FastMapHistogram@54486a9d,
...
In [23]:
val meanCornNDVIValues = computeMeanValues(ndviRDD,ContextRDD(maskRddNDVI,metadata),timeKeysNDVI,1)._1
meanCornNDVIValues: List[Double] = List(
  0.30809569377990437,
  0.313511961722488,
  0.11561722488038276,
  0.35399043062200963,
  0.34650717703349276,
  0.3658947368421053,
  0.41515789473684206,
  0.4648421052631577,
  0.4830430622009569,
  0.7659904306220096,
  0.9224688995215313,
  0.9769186602870814,
  0.9603636363636365,
  0.971464114832536,
  0.9931674641148326,
  0.9853971291866028,
  0.9193492822966507,
  0.6418755980861243,
  0.8762679425837321,
...

Plotting our results

The great thing about notebooks is of course that we can plot our results inline. For iPython, a number of well documented options exist, but for the Scala kernel this seems to be limited. Eventually, I found the Scala Bokeh bindings to work out for me.

First we'll need to do some initialization, then we can do the actuall plotting.

In [24]:
def initBokeh() = {

    import _root_.io.continuum.bokeh.{Resources, HTMLFragment}
    val resources = Resources.default

    val fragment = new HTMLFragment(scala.xml.NodeSeq.Empty, resources.styles, resources.scripts)
    val writer = new java.io.StringWriter()
    publish.display("","text/html"->fragment.preamble.toString)
}
import _root_.io.continuum.bokeh.Plot
//A utility function to show our plot
def showPlot(plot: Plot) = {
    import _root_.io.continuum.bokeh._
    val panTool = new PanTool().plot(plot)
    val wheelZoomTool = new WheelZoomTool().plot(plot)
    val previewSaveTool = new PreviewSaveTool().plot(plot)
    val resetTool = new ResetTool().plot(plot)
    val resizeTool = new ResizeTool().plot(plot)
    val crosshairTool = new CrosshairTool().plot(plot)

    plot.tools := List(panTool, wheelZoomTool, previewSaveTool, resetTool, resizeTool, crosshairTool)


    val doc = new Document(plot)

    val frag = doc.fragment(Resources.InlineMin)
    publish.display("","text/html"->frag.html.toString)
}
initBokeh()
BokehJS successfully loaded.
defined function initBokeh
import _root_.io.continuum.bokeh.Plot
defined function showPlot
In [35]:
import _root_.io.continuum.bokeh._
object source extends ColumnDataSource {
   val date = column(timeKeys.map( k => k.instant.toDouble))
   val fapar = column(meanValues)
   val ndvi = column(meanNDVIValues )
   val ndviCorn = column(meanCornNDVIValues )
}

val plot = new Plot().title("Corn/Soy mean fAPAR/NDVI").width(800).height(400)
//Fixing Data ranges
val xdr = new DataRange1d()
val ydr = new DataRange1d()
plot.x_range(xdr).y_range(ydr)

val xaxis = new DatetimeAxis().plot(plot).axis_label("Date")
val yaxis = new LinearAxis().plot(plot).axis_label("Mean fAPAR/NDVI")
plot.below <<= (xaxis :: _)
plot.left <<= (yaxis :: _)

val line = new Line().x(source.date).y(source.fapar).line_dash(List(3 ,3))
val lineGlyph = new GlyphRenderer().data_source(source).glyph(line)

val ndviLine = new Line().x(source.date).y(source.ndvi).line_color("#00FF00")
val ndviLineGlyph = new GlyphRenderer().data_source(source).glyph(ndviLine)

val ndviCornLine = new Line().x(source.date).y(source.ndviCorn)
val ndviCornLineGlyph = new GlyphRenderer().data_source(source).glyph(ndviCornLine)

val legends = List("Corn - NDVI @300M" -> List(ndviCornLineGlyph),
            "Soy - NDVI @300M" -> List(ndviLineGlyph),
            "Soy - fAPAR @1KM" -> List(lineGlyph))

  val legend = new Legend().orientation(LegendOrientation.TopLeft).plot(plot).legends(legends)

plot.renderers <<= ( xaxis :: yaxis :: ndviCornLineGlyph :: ndviLineGlyph :: lineGlyph :: legend :: _ )

showPlot(plot)
import _root_.io.continuum.bokeh._
defined object source
plot: io.continuum.bokeh.Plot = io.continuum.bokeh.Plot@8883322
xdr: io.continuum.bokeh.DataRange1d = io.continuum.bokeh.DataRange1d@456bb17e
ydr: io.continuum.bokeh.DataRange1d = io.continuum.bokeh.DataRange1d@225094e
res34_5: $user.plot.SelfType = io.continuum.bokeh.Plot@8883322
xaxis: io.continuum.bokeh.DatetimeAxis = io.continuum.bokeh.DatetimeAxis@23c3763e
yaxis: io.continuum.bokeh.LinearAxis = io.continuum.bokeh.LinearAxis@19aff4ab
line: io.continuum.bokeh.Line = io.continuum.bokeh.Line@7f2209a8
lineGlyph: io.continuum.bokeh.GlyphRenderer = io.continuum.bokeh.GlyphRenderer@6fb70c8e
ndviLine: io.continuum.bokeh.Line = io.continuum.bokeh.Line@376d74a8
ndviLineGlyph: io.continuum.bokeh.GlyphRenderer = io.continuum.bokeh.GlyphRenderer@29e4018e
ndviCornLine: io.continuum.bokeh.Line = io.continuum.bokeh.Line@43d1a547
ndviCornLineGlyph: io.continuum.bokeh.GlyphRenderer = io.continuum.bokeh.GlyphRenderer@145502cf
legends: List[(String, List[io.continuum.bokeh.GlyphRenderer])] = List(
  ("Corn - NDVI @300M", List(io.continuum.bokeh.GlyphRenderer@145502cf)),
  ("Soy - NDVI @300M", List(io.continuum.bokeh.GlyphRenderer@29e4018e)),
  ("Soy - fAPAR @1KM", List(io.continuum.bokeh.GlyphRenderer@6fb70c8e))
)
legend: io.continuum.bokeh.Legend = io.continuum.bokeh.Legend@b9c3851

Plotting histograms

The code below plots a complete histogram. This is usefull for debugging and validating our statistics computation.

In [26]:
def plotHistogram(hist:Histogram[Int]){
    import _root_.io.continuum.bokeh._
    val filteredValues = hist.values.filter(_>=0.0)
    val counts = for( value <- filteredValues) yield hist.itemCount(value)
    //note that bokeh works with doubles, so need to convert our list of ints
    object source extends ColumnDataSource {
       val value = column(filteredValues  map (_.toDouble))
       val rValue = column(filteredValues  map (_.toDouble + 1.0))
       val bottomValue = column(filteredValues  map ( _*0.0))
       val count = column(counts  map (_.toDouble))
    }

    val plot = new Plot().title("Histogram").width(800).height(400)
    //Fixing Data ranges
    val xdr = new DataRange1d()
    val ydr = new DataRange1d()
    plot.x_range(xdr).y_range(ydr)

    val xaxis = new LinearAxis().plot(plot).axis_label("Value")
    val yaxis = new LinearAxis().plot(plot).axis_label("Count")
    plot.below <<= (xaxis :: _)
    plot.left <<= (yaxis :: _)

    val quad = new Quad().top(source.count).left(source.value).right(source.rValue).bottom(source.bottomValue)
    val quadGlyph = new GlyphRenderer().data_source(source).glyph(quad)
    plot.renderers <<= ( xaxis :: yaxis :: quadGlyph :: _ )

    showPlot(plot)
}
defined function plotHistogram
In [44]:
plotHistogram(soyNDVIHistograms(timeKeysNDVI(16).instant))

In [46]:
plotHistogram(soyFaparHistograms(timeKeys(16).instant))

In [43]:
soyNDVIHistograms(timeKeysNDVI(20).instant).statistics
res42: Option[geotrellis.raster.summary.Statistics[Int]] = Some(
  Statistics(190L, 248.91052631578947, 250, 250, 3.513492000671293, 225, 250)
)