Browse Source

Add comments to Decision tree algorithm

Decision tree code now has comments, explaining how the code works.
pull/32/head
James Sherratt 5 years ago
parent
commit
a762a81271
1 changed files with 165 additions and 29 deletions
  1. +165
    -29
      ML Cookbook/DecisionTree.py

+ 165
- 29
ML Cookbook/DecisionTree.py View File

@ -1,5 +1,5 @@
""" """
:Author: james
:Author: James Sherratt
:Date: 21/10/2019 :Date: 21/10/2019
:License: MIT :License: MIT
@ -7,6 +7,10 @@
Basic implementation of a binary decision tree algorithm, with one Basic implementation of a binary decision tree algorithm, with one
discriminant per node. discriminant per node.
Useful links:
https://scikit-learn.org/stable/modules/tree.html
https://en.wikipedia.org/wiki/Decision_tree
""" """
import numpy as np import numpy as np
@ -18,7 +22,7 @@ def proportion_k(ym):
Get the proportions of each class in the current set of values. Get the proportions of each class in the current set of values.
:param ym: y values (class) of the data at a given node. :param ym: y values (class) of the data at a given node.
:return:
:return: list containing the classes and the fraction of those classes present.
""" """
counts = list(np.unique(ym, return_counts=True)) counts = list(np.unique(ym, return_counts=True))
counts[1] = counts[1]/(ym.shape[0]) counts[1] = counts[1]/(ym.shape[0])
@ -27,34 +31,51 @@ def proportion_k(ym):
def gini(k_proportions): def gini(k_proportions):
""" """
Gini impurity function.
Gini impurity function. This is used to determine the impurity of a given
set of data, given the proportions of the classes in the dataset.
This is equivalent to:
H = pk(1-pk) for all k classes.
:param k_proportions:
:return:
k_proportions, in this case, is an array of pk's
:param k_proportions: array containing proportions of different classes. Proportions sum to 1.
:return: the impurity of the dataset.
""" """
return (k_proportions*(1-k_proportions)).sum() return (k_proportions*(1-k_proportions)).sum()
def node_impurity(ym): def node_impurity(ym):
""" """
Calculate the impurity of data at a given node of the tree.
Calculate the impurity of data on one side of node after split.
:param ym:
:return:
:param ym: Actual y data for the selected dataset.
:return: dict containing the impurity value of the side and the most common class on that side.
""" """
if ym.shape[0] == 0: if ym.shape[0] == 0:
return {"impurity": 0, "max_group": 0}
return {"impurity": 0, "max_class": 0}
k_prop = proportion_k(ym) k_prop = proportion_k(ym)
return {"impurity": gini(k_prop[1]), "max_group": k_prop[0][np.argmax(k_prop[1])]}
return {"impurity": gini(k_prop[1]), "max_class": k_prop[0][np.argmax(k_prop[1])]}
def disc_val_impurity(yleft, yright): def disc_val_impurity(yleft, yright):
""" """
Calculate the level of impurity left in the given data split.
Calculate the level of impurity left in the given data after splitting. This returns
a dict which contains:
- The impurity of the data after being split.
- The class of the largest proportion on the left and right side of the split.
:param yleft:
:param yright:
:return:
The aim is to find a split which minimises impurity.
The impurity calculated is:
G = (nleft/ntot)*Hleft + (nright/ntot)*Hright
This gives the impurity of the split data.
:param yleft: Real/ training y values for the data on the left.
:param yright: Real/ training y values for the data on the right.
:return: Dict containing the data impurity after split and the most common class on the left and right of the split.
""" """
nleft = yleft.shape[0] nleft = yleft.shape[0]
nright = yright.shape[0] nright = yright.shape[0]
@ -64,25 +85,43 @@ def disc_val_impurity(yleft, yright):
return { return {
"impurity": ((nleft/ntot)*left_imp["impurity"])+((nright/ntot)*right_imp["impurity"]), "impurity": ((nleft/ntot)*left_imp["impurity"])+((nright/ntot)*right_imp["impurity"]),
"lmax_group": left_imp["max_group"],
"rmax_group": right_imp["max_group"]
"lmax_class": left_imp["max_class"],
"rmax_class": right_imp["max_class"]
} }
def niave_min_impurity(xm, ym): def niave_min_impurity(xm, ym):
"""
Find a discriminator which minimises the impurity of the data. The discriminator
is used to split data at a node.
This works by:
1. Selecting a data column as a discriminator.
2. Splitting the possible values of the discriminator into 1000 even spaced values
(between the minimum and maximum value in the dataset).
3. Selecting the discriminator column + value which minimises the impurity.
:param xm: Data on the left.
:param ym: Data on the right.
:return: dict containing the current niave minimum impurity.
"""
minxs = xm.min(axis=0) minxs = xm.min(axis=0)
maxxs = xm.max(axis=0) maxxs = xm.max(axis=0)
# discriminator with the smallest impurity. # discriminator with the smallest impurity.
cur_min_disc = None cur_min_disc = None
# Choose a column to discriminate by.
for x_idx, (dmin, dmax) in enumerate(zip(minxs, maxxs)): for x_idx, (dmin, dmax) in enumerate(zip(minxs, maxxs)):
disc_vals = np.linspace(dmin, dmax, 10)
# Create a set of possibly values to use as the discriminator for that column.
disc_vals = np.linspace(dmin, dmax, 1000)
for disc_val in disc_vals: for disc_val in disc_vals:
selection = xm[:, x_idx] < disc_val selection = xm[:, x_idx] < disc_val
yleft = ym[selection] yleft = ym[selection]
yright = ym[selection==False] yright = ym[selection==False]
# Calculate impurity.
imp = disc_val_impurity(yleft, yright) imp = disc_val_impurity(yleft, yright)
# Choose a column with the smallest impurity.
try: try:
if cur_min_disc["impurity"] > imp["impurity"]: if cur_min_disc["impurity"] > imp["impurity"]:
imp["discriminator"] = x_idx imp["discriminator"] = x_idx
@ -99,14 +138,38 @@ def niave_min_impurity(xm, ym):
class BinaryTreeClassifier: class BinaryTreeClassifier:
def __init__(self, max_depth=4, min_data=5): def __init__(self, max_depth=4, min_data=5):
"""
Initialise the binary decision tree classifier. This classifier works by:
- Splitting the data into 2 sets at every node.
- These 2 sets are then split into 2 more sets at their nodes etc. until they reach a leaf.
- At the leaves, the data is classified into whatever class was "most common" in that leaf during training.
:param max_depth: The maximum depth the binary tree classifier goes to.
:param min_data: The minimum sample size of the training data before the tree stops splitting.
"""
tree = dict() tree = dict()
self.depth = max_depth self.depth = max_depth
self.min_data = min_data self.min_data = min_data
def _node_mask(X, node): def _node_mask(X, node):
"""
Get the discriminator mask for the node. This splits the data into left and right components.
:param X: dataset input data.
:param node: the current node of the tree, with its discriminator value.
:return: truth array, which splits data left and right.
"""
return X[:, node["discriminator"]] < node["val"] return X[:, node["discriminator"]] < node["val"]
def _apply_disc(X, y, node): def _apply_disc(X, y, node):
"""
Apply the discriminator to the data at a given node.
:param X: dataset input.
:param y: dataset (observed) output.
:param node: The node to split data by.
:return: The x and y data, split left and right.
"""
left_cond = BinaryTreeClassifier._node_mask(X, node) left_cond = BinaryTreeClassifier._node_mask(X, node)
right_cond = left_cond == False right_cond = left_cond == False
left_X, left_y = X[left_cond], y[left_cond] left_X, left_y = X[left_cond], y[left_cond]
@ -115,72 +178,134 @@ class BinaryTreeClassifier:
return left_X, left_y, right_X, right_y return left_X, left_y, right_X, right_y
def _tree_node(X, y, max_depth, min_data): def _tree_node(X, y, max_depth, min_data):
"""
Create a tree node. This also creates child nodes of this node recursively.
:param X: input data for the dataset at a node.
:param y: output (observed) data for the dataset at a node.
:param max_depth: The maximum depth of the tree from this node.
:param min_data: The minimum amount of data which can be discriminated.
:return: The node + its children, as a dict.
"""
# Get the new node, as a dict.
node = niave_min_impurity(X, y) node = niave_min_impurity(X, y)
# Split the data using the discriminator.
left_X, left_y, right_X, right_y = BinaryTreeClassifier._apply_disc(X, y, node) left_X, left_y, right_X, right_y = BinaryTreeClassifier._apply_disc(X, y, node)
if max_depth > 0:
if max_depth > 1:
if left_X.shape[0] >= min_data: if left_X.shape[0] >= min_data:
# Create a new node on the left (recursively) if max depth
# and min data have not been reached.
node["left"] = BinaryTreeClassifier._tree_node(left_X, left_y, max_depth-1, min_data) node["left"] = BinaryTreeClassifier._tree_node(left_X, left_y, max_depth-1, min_data)
if right_X.shape[0] >= min_data: if right_X.shape[0] >= min_data:
# Create a new node on the right (recursively) if max depth
# and min data have not been reached.
node["right"] = BinaryTreeClassifier._tree_node(right_X, right_y, max_depth-1, min_data) node["right"] = BinaryTreeClassifier._tree_node(right_X, right_y, max_depth-1, min_data)
return node return node
def _run_tree(X, node): def _run_tree(X, node):
"""
Run a node of the classifier, recurisively.
:param node: The node to run on the data.
:return: The classified y (expected) data.
"""
# Setup y array.
y = np.zeros(X.shape[0]) y = np.zeros(X.shape[0])
# Get the discriminator left conditional.
left_cond = BinaryTreeClassifier._node_mask(X, node) left_cond = BinaryTreeClassifier._node_mask(X, node)
# Right conditional
right_cond = left_cond == False right_cond = left_cond == False
try: try:
# Try to split the data further on the left side.
y[left_cond] = BinaryTreeClassifier._run_tree(X[left_cond], node["left"]) y[left_cond] = BinaryTreeClassifier._run_tree(X[left_cond], node["left"])
except KeyError: except KeyError:
y[left_cond] = node["lmax_group"]
# If we cannot split any further, get the class of the data on the left (as this is a leaf).
y[left_cond] = node["lmax_class"]
try: try:
# Try to split the data further on the right side.
y[right_cond] = BinaryTreeClassifier._run_tree(X[right_cond], node["right"]) y[right_cond] = BinaryTreeClassifier._run_tree(X[right_cond], node["right"])
except KeyError: except KeyError:
y[right_cond] = node["rmax_group"]
# If we cannot split any further, get the class of the data on the right (as this is a leaf).
y[right_cond] = node["rmax_class"]
return y return y
def _node_dict(node, idx=0): def _node_dict(node, idx=0):
"""
Get a dict of all the nodes, recursively. The keys are the index of an array,
as if the array is a heap.
:param node: The current node to add to the dict and to get children of recursively.
:param idx: current index (key) of the node.
:return: dict containing all the nodes retrieved.
"""
# Current nodes.
nodes = {} nodes = {}
node_data = {"lmax_group": node["lmax_group"],
"rmax_group": node["rmax_group"],
node_data = {"lmax_class": node["lmax_class"],
"rmax_class": node["rmax_class"],
"discriminator": node["discriminator"], "discriminator": node["discriminator"],
"val": node["val"]} "val": node["val"]}
nodes[idx] = node_data nodes[idx] = node_data
# Try to get the left nodes.
try: try:
left_idx = 2 * idx + 1 left_idx = 2 * idx + 1
nodes.update(BinaryTreeClassifier._node_dict(node["left"], left_idx)) nodes.update(BinaryTreeClassifier._node_dict(node["left"], left_idx))
except KeyError: except KeyError:
pass pass
# Try to get the right nodes.
try: try:
right_idx = 2 * idx + 2 right_idx = 2 * idx + 2
nodes.update(BinaryTreeClassifier._node_dict(node["right"], right_idx)) nodes.update(BinaryTreeClassifier._node_dict(node["right"], right_idx))
except KeyError: except KeyError:
pass pass
# return the dict of nodes retrieved.
return nodes return nodes
def build_tree(self, X, y): def build_tree(self, X, y):
"""
Build (train) the decision tree classifier.
:param X: input training data.
:param y: output training (observed) data.
:return: None
"""
self.tree = BinaryTreeClassifier._tree_node(X, y, self.depth, self.min_data) self.tree = BinaryTreeClassifier._tree_node(X, y, self.depth, self.min_data)
def classify(self, X): def classify(self, X):
"""
Classify some data using the tree.
:param X: Input data.
:return: output (expected) classes of the data, or y values, for the given input.
"""
return BinaryTreeClassifier._run_tree(X, self.tree) return BinaryTreeClassifier._run_tree(X, self.tree)
def tree_to_heap_array(self): def tree_to_heap_array(self):
"""
Convert the tree to a binary heap, stored in an array with standard indexing.
i.e. a node at index i has children at 2i*1 and 2i+2 and a parent at (i-1)//2.
:return: list containing the tree nodes.
"""
tree_dict = BinaryTreeClassifier._node_dict(self.tree) tree_dict = BinaryTreeClassifier._node_dict(self.tree)
return [tree_dict[key] for key in sorted(tree_dict.keys())] return [tree_dict[key] for key in sorted(tree_dict.keys())]
def shuffle_split(x, y, frac=0.6): def shuffle_split(x, y, frac=0.6):
""" """
Shuffle and split X and y data.
:param x:
:param y:
:param frac:
:return:
Shuffle and split X and y data. "frac" is the ratio of the split.
e.g. 0.6 means 60% of the data goes into the left fraction, 40% into the right.
Note X and y are shuffled the same, so row i in X data is still matched with row i in y after shuffle.
:param x: X values of the data (predictor).
:param y: y values of the data (observation).
:param frac: fraction to split data by.
:return: x1, y1, x2, y2 data where x1, y1 is the left fraction and x2, y2 is the right.
""" """
data_idx = np.arange(x.shape[0]) data_idx = np.arange(x.shape[0])
sample1 = data_idx < (data_idx.max()*frac) sample1 = data_idx < (data_idx.max()*frac)
@ -193,14 +318,25 @@ def shuffle_split(x, y, frac=0.6):
if __name__ == "__main__": if __name__ == "__main__":
# Set the seed for expected test results.
np.random.seed(10) np.random.seed(10)
# Test decision tree with iris data.
iris_data = datasets.load_iris() iris_data = datasets.load_iris()
X = iris_data["data"] X = iris_data["data"]
y = iris_data["target"] y = iris_data["target"]
# Split iris data into test and train.
X_train, y_train, X_test, y_test = shuffle_split(X, y) X_train, y_train, X_test, y_test = shuffle_split(X, y)
# create the decision tree classifier.
classifier = BinaryTreeClassifier() classifier = BinaryTreeClassifier()
# Train the classifier.
classifier.build_tree(X_train, y_train) classifier.build_tree(X_train, y_train)
# Get the result when the classifier is applied to to the test data.
result = classifier.classify(X_test) result = classifier.classify(X_test)
# Get the accuracy of the classifier.
# accuracy = (number of correct results)/(total number of results)
print("accuracy:", (result == y_test).sum()/(result.shape[0])) print("accuracy:", (result == y_test).sum()/(result.shape[0]))
# convert the tree into a heap array.
tree_arr = classifier.tree_to_heap_array() tree_arr = classifier.tree_to_heap_array()
pass
print("heap:")
for i, node in enumerate(tree_arr):
print(i, node)

Loading…
Cancel
Save