diff --git a/ML Cookbook/DecisionTree.py b/ML Cookbook/DecisionTree.py index ea098ab..312efe3 100644 --- a/ML Cookbook/DecisionTree.py +++ b/ML Cookbook/DecisionTree.py @@ -1,5 +1,5 @@ """ -:Author: james +:Author: James Sherratt :Date: 21/10/2019 :License: MIT @@ -7,6 +7,10 @@ Basic implementation of a binary decision tree algorithm, with one discriminant per node. + +Useful links: +https://scikit-learn.org/stable/modules/tree.html +https://en.wikipedia.org/wiki/Decision_tree """ import numpy as np @@ -18,7 +22,7 @@ def proportion_k(ym): Get the proportions of each class in the current set of values. :param ym: y values (class) of the data at a given node. - :return: + :return: list containing the classes and the fraction of those classes present. """ counts = list(np.unique(ym, return_counts=True)) counts[1] = counts[1]/(ym.shape[0]) @@ -27,34 +31,51 @@ def proportion_k(ym): def gini(k_proportions): """ - Gini impurity function. + Gini impurity function. This is used to determine the impurity of a given + set of data, given the proportions of the classes in the dataset. + + This is equivalent to: + H = ∑ pk(1-pk) for all k classes. - :param k_proportions: - :return: + k_proportions, in this case, is an array of pk's + + :param k_proportions: array containing proportions of different classes. Proportions sum to 1. + :return: the impurity of the dataset. """ return (k_proportions*(1-k_proportions)).sum() def node_impurity(ym): """ - Calculate the impurity of data at a given node of the tree. + Calculate the impurity of data on one side of node after split. - :param ym: - :return: + :param ym: Actual y data for the selected dataset. + :return: dict containing the impurity value of the side and the most common class on that side. """ if ym.shape[0] == 0: - return {"impurity": 0, "max_group": 0} + return {"impurity": 0, "max_class": 0} k_prop = proportion_k(ym) - return {"impurity": gini(k_prop[1]), "max_group": k_prop[0][np.argmax(k_prop[1])]} + return {"impurity": gini(k_prop[1]), "max_class": k_prop[0][np.argmax(k_prop[1])]} def disc_val_impurity(yleft, yright): """ - Calculate the level of impurity left in the given data split. + Calculate the level of impurity left in the given data after splitting. This returns + a dict which contains: + + - The impurity of the data after being split. + - The class of the largest proportion on the left and right side of the split. - :param yleft: - :param yright: - :return: + The aim is to find a split which minimises impurity. + + The impurity calculated is: + G = (nleft/ntot)*Hleft + (nright/ntot)*Hright + + This gives the impurity of the split data. + + :param yleft: Real/ training y values for the data on the left. + :param yright: Real/ training y values for the data on the right. + :return: Dict containing the data impurity after split and the most common class on the left and right of the split. """ nleft = yleft.shape[0] nright = yright.shape[0] @@ -64,25 +85,43 @@ def disc_val_impurity(yleft, yright): return { "impurity": ((nleft/ntot)*left_imp["impurity"])+((nright/ntot)*right_imp["impurity"]), - "lmax_group": left_imp["max_group"], - "rmax_group": right_imp["max_group"] + "lmax_class": left_imp["max_class"], + "rmax_class": right_imp["max_class"] } def niave_min_impurity(xm, ym): + """ + Find a discriminator which minimises the impurity of the data. The discriminator + is used to split data at a node. + + This works by: + 1. Selecting a data column as a discriminator. + 2. Splitting the possible values of the discriminator into 1000 even spaced values + (between the minimum and maximum value in the dataset). + 3. Selecting the discriminator column + value which minimises the impurity. + + :param xm: Data on the left. + :param ym: Data on the right. + :return: dict containing the current niave minimum impurity. + """ minxs = xm.min(axis=0) maxxs = xm.max(axis=0) # discriminator with the smallest impurity. cur_min_disc = None + # Choose a column to discriminate by. for x_idx, (dmin, dmax) in enumerate(zip(minxs, maxxs)): - disc_vals = np.linspace(dmin, dmax, 10) + # Create a set of possibly values to use as the discriminator for that column. + disc_vals = np.linspace(dmin, dmax, 1000) for disc_val in disc_vals: selection = xm[:, x_idx] < disc_val yleft = ym[selection] yright = ym[selection==False] + # Calculate impurity. imp = disc_val_impurity(yleft, yright) + # Choose a column with the smallest impurity. try: if cur_min_disc["impurity"] > imp["impurity"]: imp["discriminator"] = x_idx @@ -99,14 +138,38 @@ def niave_min_impurity(xm, ym): class BinaryTreeClassifier: def __init__(self, max_depth=4, min_data=5): + """ + Initialise the binary decision tree classifier. This classifier works by: + - Splitting the data into 2 sets at every node. + - These 2 sets are then split into 2 more sets at their nodes etc. until they reach a leaf. + - At the leaves, the data is classified into whatever class was "most common" in that leaf during training. + + :param max_depth: The maximum depth the binary tree classifier goes to. + :param min_data: The minimum sample size of the training data before the tree stops splitting. + """ tree = dict() self.depth = max_depth self.min_data = min_data def _node_mask(X, node): + """ + Get the discriminator mask for the node. This splits the data into left and right components. + + :param X: dataset input data. + :param node: the current node of the tree, with its discriminator value. + :return: truth array, which splits data left and right. + """ return X[:, node["discriminator"]] < node["val"] def _apply_disc(X, y, node): + """ + Apply the discriminator to the data at a given node. + + :param X: dataset input. + :param y: dataset (observed) output. + :param node: The node to split data by. + :return: The x and y data, split left and right. + """ left_cond = BinaryTreeClassifier._node_mask(X, node) right_cond = left_cond == False left_X, left_y = X[left_cond], y[left_cond] @@ -115,72 +178,134 @@ class BinaryTreeClassifier: return left_X, left_y, right_X, right_y def _tree_node(X, y, max_depth, min_data): + """ + Create a tree node. This also creates child nodes of this node recursively. + + :param X: input data for the dataset at a node. + :param y: output (observed) data for the dataset at a node. + :param max_depth: The maximum depth of the tree from this node. + :param min_data: The minimum amount of data which can be discriminated. + :return: The node + its children, as a dict. + """ + # Get the new node, as a dict. node = niave_min_impurity(X, y) + # Split the data using the discriminator. left_X, left_y, right_X, right_y = BinaryTreeClassifier._apply_disc(X, y, node) - if max_depth > 0: + if max_depth > 1: if left_X.shape[0] >= min_data: + # Create a new node on the left (recursively) if max depth + # and min data have not been reached. node["left"] = BinaryTreeClassifier._tree_node(left_X, left_y, max_depth-1, min_data) if right_X.shape[0] >= min_data: + # Create a new node on the right (recursively) if max depth + # and min data have not been reached. node["right"] = BinaryTreeClassifier._tree_node(right_X, right_y, max_depth-1, min_data) return node def _run_tree(X, node): + """ + Run a node of the classifier, recurisively. + + :param node: The node to run on the data. + :return: The classified y (expected) data. + """ + # Setup y array. y = np.zeros(X.shape[0]) + # Get the discriminator left conditional. left_cond = BinaryTreeClassifier._node_mask(X, node) + # Right conditional right_cond = left_cond == False try: + # Try to split the data further on the left side. y[left_cond] = BinaryTreeClassifier._run_tree(X[left_cond], node["left"]) except KeyError: - y[left_cond] = node["lmax_group"] + # If we cannot split any further, get the class of the data on the left (as this is a leaf). + y[left_cond] = node["lmax_class"] try: + # Try to split the data further on the right side. y[right_cond] = BinaryTreeClassifier._run_tree(X[right_cond], node["right"]) except KeyError: - y[right_cond] = node["rmax_group"] + # If we cannot split any further, get the class of the data on the right (as this is a leaf). + y[right_cond] = node["rmax_class"] return y def _node_dict(node, idx=0): + """ + Get a dict of all the nodes, recursively. The keys are the index of an array, + as if the array is a heap. + + :param node: The current node to add to the dict and to get children of recursively. + :param idx: current index (key) of the node. + :return: dict containing all the nodes retrieved. + """ + # Current nodes. nodes = {} - node_data = {"lmax_group": node["lmax_group"], - "rmax_group": node["rmax_group"], + node_data = {"lmax_class": node["lmax_class"], + "rmax_class": node["rmax_class"], "discriminator": node["discriminator"], "val": node["val"]} nodes[idx] = node_data + + # Try to get the left nodes. try: left_idx = 2 * idx + 1 nodes.update(BinaryTreeClassifier._node_dict(node["left"], left_idx)) except KeyError: pass + # Try to get the right nodes. try: right_idx = 2 * idx + 2 nodes.update(BinaryTreeClassifier._node_dict(node["right"], right_idx)) except KeyError: pass + # return the dict of nodes retrieved. return nodes def build_tree(self, X, y): + """ + Build (train) the decision tree classifier. + + :param X: input training data. + :param y: output training (observed) data. + :return: None + """ self.tree = BinaryTreeClassifier._tree_node(X, y, self.depth, self.min_data) def classify(self, X): + """ + Classify some data using the tree. + + :param X: Input data. + :return: output (expected) classes of the data, or y values, for the given input. + """ return BinaryTreeClassifier._run_tree(X, self.tree) def tree_to_heap_array(self): + """ + Convert the tree to a binary heap, stored in an array with standard indexing. + i.e. a node at index i has children at 2i*1 and 2i+2 and a parent at (i-1)//2. + + :return: list containing the tree nodes. + """ tree_dict = BinaryTreeClassifier._node_dict(self.tree) return [tree_dict[key] for key in sorted(tree_dict.keys())] def shuffle_split(x, y, frac=0.6): """ - Shuffle and split X and y data. - - :param x: - :param y: - :param frac: - :return: + Shuffle and split X and y data. "frac" is the ratio of the split. + e.g. 0.6 means 60% of the data goes into the left fraction, 40% into the right. + Note X and y are shuffled the same, so row i in X data is still matched with row i in y after shuffle. + + :param x: X values of the data (predictor). + :param y: y values of the data (observation). + :param frac: fraction to split data by. + :return: x1, y1, x2, y2 data where x1, y1 is the left fraction and x2, y2 is the right. """ data_idx = np.arange(x.shape[0]) sample1 = data_idx < (data_idx.max()*frac) @@ -193,14 +318,25 @@ def shuffle_split(x, y, frac=0.6): if __name__ == "__main__": + # Set the seed for expected test results. np.random.seed(10) + # Test decision tree with iris data. iris_data = datasets.load_iris() X = iris_data["data"] y = iris_data["target"] + # Split iris data into test and train. X_train, y_train, X_test, y_test = shuffle_split(X, y) + # create the decision tree classifier. classifier = BinaryTreeClassifier() + # Train the classifier. classifier.build_tree(X_train, y_train) + # Get the result when the classifier is applied to to the test data. result = classifier.classify(X_test) + # Get the accuracy of the classifier. + # accuracy = (number of correct results)/(total number of results) print("accuracy:", (result == y_test).sum()/(result.shape[0])) + # convert the tree into a heap array. tree_arr = classifier.tree_to_heap_array() - pass + print("heap:") + for i, node in enumerate(tree_arr): + print(i, node)