Snow Detection Using Spark

Introduction

In this Jupyter notebook, we will build an SVM classifier for Snow/Ice detection using Spark for the Proba-V 100m Top Of Atmosphere (TOA) Radiometry data.

Data

Radiometry Data

The Radiometry file is contained in a GeoTIFF file format. The file contains 4 raster bands:

  1. RED
  2. NIR
  3. BLUE
  4. SWIR

Each raster band is a pixel grid $Y*X$ where each value of the grid represents the TOA Reflectance value for that band for that pixel.

Status Map

There is also a status map file containing a single band. For each pixel in that band, the value represents the class of that pixel.

In our case, the flag we are interested in is coded in binary as $100$, representing that the corresponding pixel in the Radiometry file is Snow or Ice, as documented in the Proba-V User Manual.

Reading the files

Imports

We will be using the following libraries / frameworks:

  1. numpy: for numerical processing
  2. gdal: for reading GeoTIFF files
  3. seaborn: for plotting
  4. pandas: for handling small DataFrames
  5. spark
In [1]:
import numpy as np
import requests
import gdal

import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
%matplotlib inline

import pyspark
/usr/local/lib/python2.7/site-packages/matplotlib/__init__.py:872: UserWarning: axes.color_cycle is deprecated and replaced with axes.prop_cycle; please use the latter.
  warnings.warn(self.msg_depr % (key, alt_key))

Setting up the Spark Context

In order to be able to access our Spark cluster, we first need to set up our Spark Context. The below snippet shows how to do this and also shows how to modify the default configuration.

In [2]:
from pyspark.conf import SparkConf
conf = SparkConf()
conf.set('spark.yarn.executor.memoryOverhead', 1024)
conf.set('spark.executor.memory', '8g')
conf.set('spark.executor.cores', '2')
sc = pyspark.SparkContext(conf=conf)
sqlContext = pyspark.SQLContext(sc)

The files are stored in a shared folder as defined below. The complete list of files can be requested using an API, but in our case we will be focusing on an area of the Alps, where there is plenty of Snow/Ice areas.

The Radiometry files end with _RADIOMETRY.tif whereas the status map ends with _SM.tif.

In [3]:
files = [
    "/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/20151121/PROBAV_S5_TOA_20151121_100M_V001/PROBAV_S5_TOA_X18Y02_20151121_100M_V001.tif"
]

bands = [
    "RED",
    "NIR",
    "BLUE",
    "SWIR"
]

def radiometry_file(filename):
    return filename[:-4] + "_RADIOMETRY.tif"

def status_file(filename):
    return filename[:-4] + "_SM.tif"

Processing the files

In order to read the files in a parallel manner, we need to instruct Spark to parallelize our list of files.

In [4]:
data_files = sc.parallelize([(status_file(f), radiometry_file(f)) for f in files]).cache()
data_files
Out[4]:
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:396

Because our files are so large, we don't want to read the complete file in a single run. Instead, we will split each file into chunks, read each chunk and then combine the chunks.

This will also enable us to distribute the reading of our files to different Spark Executors, so that each Spark Executor reads a specific part of each file.

In [5]:
def makeSplits( files, splits=100 ):
    statusmap, radiometry = files
    dataset = gdal.Open(statusmap)
    status = dataset.GetRasterBand(1)
    del dataset
    XSize = status.XSize
    YSize = status.YSize
    
    chunks = []
    chunksize = (int(XSize / float(splits)), int(YSize / float(splits)))
    for x in range(0, XSize, chunksize[0]):
        for y in range(0, YSize, chunksize[1]):
            chunks.append({
                    'statusmap': statusmap,
                    'radiometry': radiometry,
                    'x': (x, min(XSize - x, chunksize[0])),
                    'y': (y, min(YSize - y, chunksize[1]))
                })
    
    return chunks

chunks = data_files.flatMap(makeSplits).repartition(100)
chunks.take(1)
Out[5]:
[{'radiometry': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/20151121/PROBAV_S5_TOA_20151121_100M_V001/PROBAV_S5_TOA_X18Y02_20151121_100M_V001_RADIOMETRY.tif',
  'statusmap': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/20151121/PROBAV_S5_TOA_20151121_100M_V001/PROBAV_S5_TOA_X18Y02_20151121_100M_V001_SM.tif',
  'x': (0, 100),
  'y': (0, 100)}]

We can now define the functions to read each chunk.

In this process, what we want to accomplish is to end up with a list such that each element is a single pixel from the GeoTIFF files containing both the corresponding bitmask from the statusmap and the Reflectance for each individual band.

In [6]:
def parseTargets(statusmap, x, y):
    dataset = gdal.Open(statusmap)
    status = dataset.GetRasterBand(1)
    ret = status.ReadAsArray(x[0], y[0], x[1], y[1])
    del dataset
    return np.array(ret).flatten(order='F').tolist()
    
def parseFeatures( radiometry, x, y ):
    raster = gdal.Open(radiometry)
    raster_bands = [ raster.GetRasterBand(i).ReadAsArray(x[0], y[0], x[1], y[1]) for i in xrange(1, raster.RasterCount + 1) ]
    # 4 * Y * X
    
    del raster
    raster_bands = np.transpose(raster_bands)
    # Y * 4 * X

    raster_bands = raster_bands.reshape((len(raster_bands) * len(raster_bands[0]), len(raster_bands[0][0])))
    
    # Y * X * 4
    return raster_bands.tolist()

def parseChunk(chunk):
    return zip(
        parseTargets(chunk['statusmap'], chunk['x'], chunk['y']), 
        parseFeatures(chunk['radiometry'], chunk['x'], chunk['y'])
    )

dataset = chunks.flatMap(parseChunk)
dataset.take(5)
Out[6]:
[(244, [764, 804, 874, 305]),
 (244, [767, 805, 876, 306]),
 (244, [768, 809, 879, 306]),
 (244, [775, 810, 880, 305]),
 (244, [773, 814, 879, 305])]

Some pixels contain invalid data. In such cases, one of the Reflectance value will be equal to $-1$. Since those pixels contain incomplete data, we might as well filter them out.

In [7]:
def is_valid(row):
    for v in row[1]:
        if v == -1:
            return False
    return True

dataset = dataset.filter(is_valid).repartition(100)
dataset.take(5)
Out[7]:
[(123, [1418, 1482, 2055, 618]),
 (123, [1376, 1434, 1998, 586]),
 (251, [1331, 1395, 1403, 573]),
 (251, [1307, 1351, 1405, 578]),
 (251, [1318, 1355, 1406, 594])]

As mentioned earlier, the mask for Snow/Ice is $100$. Since we are only interested in those, we can define a function to convert the complete bitmask into a single bit equal to 1 if the pixel is Snow/Ice and 0 otherwise.

In [8]:
def is_snow(row):
    return (int(row[0] & 0b100 != 0) , row[1])

dataset = dataset.map(is_snow).cache()
dataset.take(5)
Out[8]:
[(1, [675, 695, 783, 304]),
 (1, [674, 694, 773, 300]),
 (1, [1456, 1593, 1490, 478]),
 (1, [1392, 1540, 1485, 460]),
 (1, [1392, 1512, 1494, 448])]

Since this is a dataset we will be using very often, we might as well cache it. The following snippet instructs Spark exactly of this.

In [9]:
dataset = dataset.cache()

Visualizing the data

Now we are ready to do some visualizations. Before we go further, let's take a balanced sample from our dataset, meaning about the same number of positive (snow/ice) and negative samples.

In [10]:
from pyspark.mllib.regression import LabeledPoint

def parseSample(row):
    return LabeledPoint( row[0], row[1])

def sample(size):
    sizes = dataset.countByKey()
    sample_fractions = {
        0.0: float(size / 2) / sizes[0.0],
        1.0: float(size / 2) / sizes[1.0] 
    }
    
    samples = dataset.sampleByKey( 
        withReplacement = False, 
        fractions = sample_fractions
    ).map(parseSample).cache()
    
    return samples

samples = sample(500)
positives = samples.filter(lambda r: r.label == 1).map(lambda r: np.append(r.features, r.label)).collect()
negatives = samples.filter(lambda r: r.label == 0).map(lambda r: np.append(r.features, r.label)).collect()
all_data = positives + negatives
df = pd.DataFrame(all_data, columns=bands + ["snow"])
df
Out[10]:
RED NIR BLUE SWIR snow
0 895 1077 974 371 1
1 990 1188 1080 478 1
2 644 799 789 416 1
3 1184 1213 1321 400 1
4 647 760 746 321 1
5 948 997 1051 437 1
6 1120 1242 1169 382 1
7 1007 1050 1142 429 1
8 686 714 786 296 1
9 1392 1526 1293 458 1
10 815 950 913 315 1
11 876 999 977 342 1
12 705 812 802 226 1
13 890 944 1038 348 1
14 729 754 894 351 1
15 1285 1374 1399 356 1
16 1036 1099 1168 435 1
17 1054 1083 1242 186 1
18 711 749 866 389 1
19 1062 1185 1131 448 1
20 964 1044 1088 435 1
21 846 884 952 428 1
22 725 893 817 332 1
23 961 941 1166 427 1
24 863 986 968 320 1
25 741 957 817 461 1
26 676 714 819 477 1
27 965 1084 1089 331 1
28 895 1008 1030 306 1
29 846 861 1019 431 1
... ... ... ... ... ...
478 179 596 341 317 0
479 1177 1282 1263 541 0
480 1479 1603 2010 1130 0
481 131 416 337 172 0
482 500 645 613 286 0
483 109 135 310 65 0
484 1023 1097 1138 621 0
485 1468 1570 1379 906 0
486 1238 1481 1218 1110 0
487 526 555 741 280 0
488 152 515 323 273 0
489 975 1052 1107 551 0
490 2068 2462 1544 694 0
491 285 236 558 215 0
492 196 142 401 89 0
493 236 281 428 156 0
494 849 1001 934 571 0
495 1135 1189 1283 547 0
496 143 86 389 45 0
497 1727 1931 1372 887 0
498 202 488 287 527 0
499 1183 1227 1325 508 0
500 378 335 560 150 0
501 1158 1211 1332 625 0
502 110 168 307 89 0
503 543 714 643 262 0
504 328 556 476 390 0
505 1642 1774 1464 581 0
506 338 749 449 490 0
507 173 677 287 318 0

508 rows × 5 columns

We can now visualize our sample.

The following box plots shows us that there are indeed notable differences in the distributions for snow and not snow, confirming that building a classifier should be possible.

In [11]:
for band in bands:
    sns.boxplot(x='snow', y=band, order=[0, 1], data=df)
    plt.show()
/usr/local/lib/python2.7/site-packages/matplotlib/__init__.py:892: UserWarning: axes.color_cycle is deprecated and replaced with axes.prop_cycle; please use the latter.
  warnings.warn(self.msg_depr % (key, alt_key))

Another way to look at the data is using scatter plots, to see if there is any correlations between the different bands but also to see if there is any interaction between the bands for the snow class.

The following pair plots show that:

  1. There is a high correlation between RED, NIR and BLUE.
  2. There is clearly a cutoff point when RED > 500 and SWIR < 500

This confirms the following:

  1. The classes are linearly separable but
  2. Separation requires interaction features
In [12]:
sns.pairplot(df, hue='snow', vars=bands, hue_order=[0, 1])
Out[12]:
<seaborn.axisgrid.PairGrid at 0x7f848fcf9910>

Building the classifier

First, we need to do some preprocessing before we can build our classifier.

  1. SVM works better when the data is rescaled
  2. We need to introduce interaction variables, e.g. by building a polynomial expansion
  3. SVM generally requires the dataset to be balanced (or use class weights). Since we have so many available positive samples, we will simply balance our dataset by undersampling our negative class.

So let's do just that, as follows:

In [13]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.feature import StandardScaler
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.ml import Pipeline

def transform(data):
    polyExpansion = PolynomialExpansion(
        inputCol="features",
        outputCol="polyFeatures",
        degree=2
    )

    scaler = StandardScaler(
        withMean=True,
        withStd=True,
        inputCol="polyFeatures",
        outputCol="scaledFeatures"
    )

    pipeline = Pipeline(stages=[polyExpansion, scaler])

    X = data.toDF()
    transformer = pipeline.fit(X)
    X = transformer.transform(X).map(lambda x: x.scaledFeatures)
    y = data.map(lambda p: p.label)
    return (transformer, y.zip(X).map(parseSample))

transformer, dataset = transform(sample(10000))
dataset = dataset.cache()
dataset.take(1)
Out[13]:
[LabeledPoint(1.0, [-0.532192002519,-0.661825760036,-0.732787763956,-0.720936248622,-0.785935189503,-0.601731206957,-0.673370961935,-0.742999843046,-0.639860445239,-0.722099325158,-0.688807723097,-0.752183284034,-0.729910920645,-0.627274019683])]

We will train our classifier using 75% of the dataset and test it on the remaining 25%. So let's split it first:

In [14]:
train_data, test_data = dataset.randomSplit([0.75, 0.25])
train_data = train_data.cache()
test_data = test_data.cache()

Finally, we can train and test our model.

In [15]:
from sklearn.metrics import precision_recall_fscore_support

def train(training_data, iterations, regParam, step):
    model = SVMWithSGD.train(training_data, iterations=iterations, regParam=regParam, step=step)
    return model

def evaluate(model, train_data, test_data):
    train_y = train_data.map(lambda p: p.label).collect()
    test_y = test_data.map(lambda p: p.label).collect()
    train_predictions = train_data.map(lambda p: model.predict(p.features)).collect()
    test_predictions = test_data.map(lambda p: model.predict(p.features)).collect()
    
    _, _, train_f, _ = precision_recall_fscore_support(train_y, train_predictions, average='binary')
    _, _, test_f, _ = precision_recall_fscore_support(test_y, test_predictions, average='binary')
    
    return (train_f, test_f)

def train_evaluate(train_data, test_data, iterations, regParam, step):    
    print "Training with", train_data.count(), "samples"
    print "Params: ", iterations, regParam, step
    model = train(train_data, iterations, regParam, step)
    train_f, test_f = evaluate(model, train_data, test_data)
    
    print "Train F1", train_f
    print "Test F1", test_f
    print ""
    return (model, (train_data.count(), iterations, regParam, step, train_f, test_f))

model, results = train_evaluate(train_data,
                         test_data,
                         iterations=100, 
                         step=1.0, 
                         regParam=0.)
Training with 7392 samples
Params:  100 0.0 1.0
Train F1 0.904526435907
Test F1 0.899344388739

Visualising the output

We can draw the GeoTIFF using matplotlib. But the files are so big that we also need to reduce its resolution. We can re-use the chunking mechanism used previously and plot the average values for every chunk instead.

However, we will need to keep track of the position of the chunk, so we add that to our function.

In [25]:
def makeSplits( files, splits=100 ):
    statusmap, radiometry = files
    sm = gdal.Open(statusmap)
    status = sm.GetRasterBand(1)
    del sm
    XSize = status.XSize
    YSize = status.YSize
    
    chunks = []
    chunksize = (int(XSize / float(splits)), int(YSize / float(splits)))
    for x in range(0, splits):
        for y in range(0, splits):
            chunks.append({
                    'statusmap': statusmap,
                    'radiometry': radiometry,
                    'chunk': (x, y),
                    'x_range': (x * chunksize[0], chunksize[0]),
                    'y_range': (y * chunksize[1], chunksize[1])
                })
    
    return chunks

chunks = data_files.flatMap(makeSplits).repartition(100)
chunks.take(1)
Out[25]:
[{'chunk': (0, 0),
  'radiometry': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/20151121/PROBAV_S5_TOA_20151121_100M_V001/PROBAV_S5_TOA_X18Y02_20151121_100M_V001_RADIOMETRY.tif',
  'statusmap': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/20151121/PROBAV_S5_TOA_20151121_100M_V001/PROBAV_S5_TOA_X18Y02_20151121_100M_V001_SM.tif',
  'x_range': (0, 100),
  'y_range': (0, 100)}]

Now that our chunks have a position, we need to propagate the position up to the pixel. We then take the average for every chunk and then plot the average.

In [17]:
def is_snow_mask(mask):
    return (int(mask & 0b100 != 0))

def parseChunk(chunk):
    statusmap = map(is_snow_mask, parseTargets(chunk['statusmap'], chunk['x_range'], chunk['y_range']))
    features = parseFeatures(chunk['radiometry'], chunk['x_range'], chunk['y_range'])
    return (chunk['chunk'], map(parseSample, zip(statusmap, features)))

all_data = chunks.map(parseChunk)

def average_snow(data):
    return np.mean(map(lambda x: x.label, data))

averaged_by_chunk = all_data.map(lambda x: (x[0], average_snow(x[1]))).cache()
averaged_by_chunk.count()
Out[17]:
10000
In [18]:
img_flat = averaged_by_chunk.collect()

Here is how our original data looked like.

In [19]:
img = np.array(img_flat)[:, 1].reshape(100, 100, order='F').astype('float')
plt.imshow(img, cmap='winter')
plt.colorbar()
Out[19]:
<matplotlib.colorbar.Colorbar at 0x7f848c46fad0>

Now, instead of drawing the original Snow/Ice component of our pixels, we will instead use our classifier to predict that value and then draw that one instead.

In [20]:
all_df = all_data.flatMap(lambda x: [{'x': x[0][0], 'y': x[0][1], 'features': p.features, 'label': p.label} for p in x[1]]).toDF()
/usr/hdp/current/spark-client/python/pyspark/sql/context.py:209: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "
In [21]:
groups = transformer.transform(all_df).map(lambda x: ((x.x, x.y), model.predict(x.scaledFeatures))).groupByKey().cache()
In [22]:
averaged_by_chunk = groups.map(lambda g: (g[0], np.mean(list(g[1])))).collect()
In [23]:
img = np.zeros((100, 100))
for x in np.array(averaged_by_chunk):
    pos, v = x
    img[pos[1]][pos[0]] = v

Final result

Here's how our classifier estimated snow. It looks roughly the same as the original one, except it looks to be slightly more permissive for the snow class.

In [24]:
plt.imshow(img, cmap='winter')
plt.colorbar()
Out[24]:
<matplotlib.colorbar.Colorbar at 0x7f848c111fd0>