In this Jupyter notebook, we will build an SVM classifier for Snow/Ice detection using Spark for the Proba-V 100m Top Of Atmosphere (TOA) Radiometry data.
The Radiometry file is contained in a GeoTIFF file format. The file contains 4 raster bands:
Each raster band is a pixel grid $Y*X$ where each value of the grid represents the TOA Reflectance value for that band for that pixel.
There is also a status map file containing a single band. For each pixel in that band, the value represents the class of that pixel.
In our case, the flag we are interested in is coded in binary as $100$, representing that the corresponding pixel in the Radiometry file is Snow or Ice, as documented in the Proba-V User Manual.
import numpy as np
import requests
import gdal
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
%matplotlib inline
import pyspark
In order to be able to access our Spark cluster, we first need to set up our Spark Context. The below snippet shows how to do this and also shows how to modify the default configuration.
from pyspark.conf import SparkConf
conf = SparkConf()
conf.set('spark.yarn.executor.memoryOverhead', 1024)
conf.set('spark.executor.memory', '8g')
conf.set('spark.executor.cores', '2')
sc = pyspark.SparkContext(conf=conf)
sqlContext = pyspark.SQLContext(sc)
The files are stored in a shared folder as defined below. The complete list of files can be requested using an API, but in our case we will be focusing on an area of the Alps, where there is plenty of Snow/Ice areas.
The Radiometry files end with _RADIOMETRY.tif whereas the status map ends with _SM.tif.
files = [
"/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/20151121/PROBAV_S5_TOA_20151121_100M_V001/PROBAV_S5_TOA_X18Y02_20151121_100M_V001.tif"
]
bands = [
"RED",
"NIR",
"BLUE",
"SWIR"
]
def radiometry_file(filename):
return filename[:-4] + "_RADIOMETRY.tif"
def status_file(filename):
return filename[:-4] + "_SM.tif"
In order to read the files in a parallel manner, we need to instruct Spark to parallelize our list of files.
data_files = sc.parallelize([(status_file(f), radiometry_file(f)) for f in files]).cache()
data_files
Because our files are so large, we don't want to read the complete file in a single run. Instead, we will split each file into chunks, read each chunk and then combine the chunks.
This will also enable us to distribute the reading of our files to different Spark Executors, so that each Spark Executor reads a specific part of each file.
def makeSplits( files, splits=100 ):
statusmap, radiometry = files
dataset = gdal.Open(statusmap)
status = dataset.GetRasterBand(1)
del dataset
XSize = status.XSize
YSize = status.YSize
chunks = []
chunksize = (int(XSize / float(splits)), int(YSize / float(splits)))
for x in range(0, XSize, chunksize[0]):
for y in range(0, YSize, chunksize[1]):
chunks.append({
'statusmap': statusmap,
'radiometry': radiometry,
'x': (x, min(XSize - x, chunksize[0])),
'y': (y, min(YSize - y, chunksize[1]))
})
return chunks
chunks = data_files.flatMap(makeSplits).repartition(100)
chunks.take(1)
We can now define the functions to read each chunk.
In this process, what we want to accomplish is to end up with a list such that each element is a single pixel from the GeoTIFF files containing both the corresponding bitmask from the statusmap and the Reflectance for each individual band.
def parseTargets(statusmap, x, y):
dataset = gdal.Open(statusmap)
status = dataset.GetRasterBand(1)
ret = status.ReadAsArray(x[0], y[0], x[1], y[1])
del dataset
return np.array(ret).flatten(order='F').tolist()
def parseFeatures( radiometry, x, y ):
raster = gdal.Open(radiometry)
raster_bands = [ raster.GetRasterBand(i).ReadAsArray(x[0], y[0], x[1], y[1]) for i in xrange(1, raster.RasterCount + 1) ]
# 4 * Y * X
del raster
raster_bands = np.transpose(raster_bands)
# Y * 4 * X
raster_bands = raster_bands.reshape((len(raster_bands) * len(raster_bands[0]), len(raster_bands[0][0])))
# Y * X * 4
return raster_bands.tolist()
def parseChunk(chunk):
return zip(
parseTargets(chunk['statusmap'], chunk['x'], chunk['y']),
parseFeatures(chunk['radiometry'], chunk['x'], chunk['y'])
)
dataset = chunks.flatMap(parseChunk)
dataset.take(5)
Some pixels contain invalid data. In such cases, one of the Reflectance value will be equal to $-1$. Since those pixels contain incomplete data, we might as well filter them out.
def is_valid(row):
for v in row[1]:
if v == -1:
return False
return True
dataset = dataset.filter(is_valid).repartition(100)
dataset.take(5)
As mentioned earlier, the mask for Snow/Ice is $100$. Since we are only interested in those, we can define a function to convert the complete bitmask into a single bit equal to 1 if the pixel is Snow/Ice and 0 otherwise.
def is_snow(row):
return (int(row[0] & 0b100 != 0) , row[1])
dataset = dataset.map(is_snow).cache()
dataset.take(5)
Since this is a dataset we will be using very often, we might as well cache it. The following snippet instructs Spark exactly of this.
dataset = dataset.cache()
Now we are ready to do some visualizations. Before we go further, let's take a balanced sample from our dataset, meaning about the same number of positive (snow/ice) and negative samples.
from pyspark.mllib.regression import LabeledPoint
def parseSample(row):
return LabeledPoint( row[0], row[1])
def sample(size):
sizes = dataset.countByKey()
sample_fractions = {
0.0: float(size / 2) / sizes[0.0],
1.0: float(size / 2) / sizes[1.0]
}
samples = dataset.sampleByKey(
withReplacement = False,
fractions = sample_fractions
).map(parseSample).cache()
return samples
samples = sample(500)
positives = samples.filter(lambda r: r.label == 1).map(lambda r: np.append(r.features, r.label)).collect()
negatives = samples.filter(lambda r: r.label == 0).map(lambda r: np.append(r.features, r.label)).collect()
all_data = positives + negatives
df = pd.DataFrame(all_data, columns=bands + ["snow"])
df
We can now visualize our sample.
The following box plots shows us that there are indeed notable differences in the distributions for snow and not snow, confirming that building a classifier should be possible.
for band in bands:
sns.boxplot(x='snow', y=band, order=[0, 1], data=df)
plt.show()
Another way to look at the data is using scatter plots, to see if there is any correlations between the different bands but also to see if there is any interaction between the bands for the snow class.
The following pair plots show that:
This confirms the following:
sns.pairplot(df, hue='snow', vars=bands, hue_order=[0, 1])
First, we need to do some preprocessing before we can build our classifier.
So let's do just that, as follows:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.feature import StandardScaler
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.ml import Pipeline
def transform(data):
polyExpansion = PolynomialExpansion(
inputCol="features",
outputCol="polyFeatures",
degree=2
)
scaler = StandardScaler(
withMean=True,
withStd=True,
inputCol="polyFeatures",
outputCol="scaledFeatures"
)
pipeline = Pipeline(stages=[polyExpansion, scaler])
X = data.toDF()
transformer = pipeline.fit(X)
X = transformer.transform(X).map(lambda x: x.scaledFeatures)
y = data.map(lambda p: p.label)
return (transformer, y.zip(X).map(parseSample))
transformer, dataset = transform(sample(10000))
dataset = dataset.cache()
dataset.take(1)
We will train our classifier using 75% of the dataset and test it on the remaining 25%. So let's split it first:
train_data, test_data = dataset.randomSplit([0.75, 0.25])
train_data = train_data.cache()
test_data = test_data.cache()
Finally, we can train and test our model.
from sklearn.metrics import precision_recall_fscore_support
def train(training_data, iterations, regParam, step):
model = SVMWithSGD.train(training_data, iterations=iterations, regParam=regParam, step=step)
return model
def evaluate(model, train_data, test_data):
train_y = train_data.map(lambda p: p.label).collect()
test_y = test_data.map(lambda p: p.label).collect()
train_predictions = train_data.map(lambda p: model.predict(p.features)).collect()
test_predictions = test_data.map(lambda p: model.predict(p.features)).collect()
_, _, train_f, _ = precision_recall_fscore_support(train_y, train_predictions, average='binary')
_, _, test_f, _ = precision_recall_fscore_support(test_y, test_predictions, average='binary')
return (train_f, test_f)
def train_evaluate(train_data, test_data, iterations, regParam, step):
print "Training with", train_data.count(), "samples"
print "Params: ", iterations, regParam, step
model = train(train_data, iterations, regParam, step)
train_f, test_f = evaluate(model, train_data, test_data)
print "Train F1", train_f
print "Test F1", test_f
print ""
return (model, (train_data.count(), iterations, regParam, step, train_f, test_f))
model, results = train_evaluate(train_data,
test_data,
iterations=100,
step=1.0,
regParam=0.)
We can draw the GeoTIFF using matplotlib. But the files are so big that we also need to reduce its resolution. We can re-use the chunking mechanism used previously and plot the average values for every chunk instead.
However, we will need to keep track of the position of the chunk, so we add that to our function.
def makeSplits( files, splits=100 ):
statusmap, radiometry = files
sm = gdal.Open(statusmap)
status = sm.GetRasterBand(1)
del sm
XSize = status.XSize
YSize = status.YSize
chunks = []
chunksize = (int(XSize / float(splits)), int(YSize / float(splits)))
for x in range(0, splits):
for y in range(0, splits):
chunks.append({
'statusmap': statusmap,
'radiometry': radiometry,
'chunk': (x, y),
'x_range': (x * chunksize[0], chunksize[0]),
'y_range': (y * chunksize[1], chunksize[1])
})
return chunks
chunks = data_files.flatMap(makeSplits).repartition(100)
chunks.take(1)
Now that our chunks have a position, we need to propagate the position up to the pixel. We then take the average for every chunk and then plot the average.
def is_snow_mask(mask):
return (int(mask & 0b100 != 0))
def parseChunk(chunk):
statusmap = map(is_snow_mask, parseTargets(chunk['statusmap'], chunk['x_range'], chunk['y_range']))
features = parseFeatures(chunk['radiometry'], chunk['x_range'], chunk['y_range'])
return (chunk['chunk'], map(parseSample, zip(statusmap, features)))
all_data = chunks.map(parseChunk)
def average_snow(data):
return np.mean(map(lambda x: x.label, data))
averaged_by_chunk = all_data.map(lambda x: (x[0], average_snow(x[1]))).cache()
averaged_by_chunk.count()
img_flat = averaged_by_chunk.collect()
Here is how our original data looked like.
img = np.array(img_flat)[:, 1].reshape(100, 100, order='F').astype('float')
plt.imshow(img, cmap='winter')
plt.colorbar()
Now, instead of drawing the original Snow/Ice component of our pixels, we will instead use our classifier to predict that value and then draw that one instead.
all_df = all_data.flatMap(lambda x: [{'x': x[0][0], 'y': x[0][1], 'features': p.features, 'label': p.label} for p in x[1]]).toDF()
groups = transformer.transform(all_df).map(lambda x: ((x.x, x.y), model.predict(x.scaledFeatures))).groupByKey().cache()
averaged_by_chunk = groups.map(lambda g: (g[0], np.mean(list(g[1])))).collect()
img = np.zeros((100, 100))
for x in np.array(averaged_by_chunk):
pos, v = x
img[pos[1]][pos[0]] = v
Here's how our classifier estimated snow. It looks roughly the same as the original one, except it looks to be slightly more permissive for the snow class.
plt.imshow(img, cmap='winter')
plt.colorbar()