Module ktrain.utils

Expand source code
from .imports import *

# ------------------------------------------------------------------------------
# KTRAIN DEFAULTS
# ------------------------------------------------------------------------------
DEFAULT_WD = 0.01


def get_default_optimizer(lr=0.001, wd=DEFAULT_WD):
    from .lroptimize.optimization import AdamWeightDecay

    opt = AdamWeightDecay(
        learning_rate=lr,
        weight_decay_rate=wd,
        beta_1=0.9,
        beta_2=0.999,
        epsilon=1e-6,
        exclude_from_weight_decay=["layer_norm", "bias"],
    )
    return opt


# Use vanilla Adam as default unless weight decay is explicitly set by user
# in which case AdamWeightDecay is default optimizer.
# See core.Learner.set_weight_decay for more information
# dep_fix
if "tensorflow" in sys.modules:
    DEFAULT_OPT = (
        "adam"
        if version.parse(tf.__version__) < version.parse("2.11")
        else tf.keras.optimizers.legacy.Adam()
    )
else:
    DEFAULT_OPT = "adam"
DEFAULT_BS = 32
DEFAULT_ES = 5
DEFAULT_ROP = 2
# from .lroptimize.optimization import AdamWeightDecay
# DEFAULT_OPT = AdamWeightDecay(learning_rate=0.001,
# weight_decay_rate=0.01,
# beta_1=0.9,
# beta_2=0.999,
# epsilon=1e-6,
# exclude_from_weight_decay=['layer_norm', 'bias'])
DEFAULT_TRANSFORMER_LAYERS = [-2]  # second-to-last hidden state
DEFAULT_TRANSFORMER_MAXLEN = 512
DEFAULT_TRANSFORMER_NUM_SPECIAL = 2
MODEL_BASENAME = "tf_model"
MODEL_NAME = MODEL_BASENAME + ".h5"
PREPROC_NAME = MODEL_BASENAME + ".preproc"


# ------------------------------------------------------------------------------
# DATA/MODEL INSPECTORS
# ------------------------------------------------------------------------------


def is_ktrain_dataset(data):
    from .dataset import Dataset

    return isinstance(data, Dataset)


def loss_fn_from_model(model):
    # dep_fix
    if version.parse(tf.__version__) < version.parse("2.2") or DISABLE_V2_BEHAVIOR:
        return model.loss_functions[0].fn
    else:  # TF 2.2.0
        return model.compiled_loss._get_loss_object(
            model.compiled_loss._losses[0].name
        ).fn


def metrics_from_model(model):
    msg = "Could not retrieve metrics list from compiled model"

    # dep_fix
    if version.parse(tf.__version__) < version.parse("2.2") or DISABLE_V2_BEHAVIOR:
        return model._compile_metrics
        # return [m.name for m in model.metrics] if is_tf_keras() else model.metrics
    else:  # TF >= 2.2.0
        mlist = model.compiled_metrics._metrics
        if isinstance(mlist, list) and isinstance(
            mlist[0], str
        ):  # metrics are strings prior to training
            return mlist
        elif isinstance(mlist, list) and isinstance(mlist[0], list):
            try:
                return [m.name for m in mlist[0]]
            except:
                warnings.warn(msg)
                return []
        elif isinstance(mlist, list) and hasattr(
            mlist[0], "name"
        ):  # tf.keras.metrics.AUC()
            try:
                return [m.name for m in mlist]
            except:
                warnings.warn(msg)
                return []

        else:
            warnings.warn(msg)
            return []


def is_classifier(model):
    """
    checks for classification and mutlilabel from model
    """
    is_classifier = False
    is_multilabel = False

    # get loss name
    loss = model.loss
    if callable(loss):
        if hasattr(loss, "__name__"):
            loss = loss.__name__
        elif hasattr(loss, "name"):
            loss = loss.name
        else:
            raise Exception("could not get loss name")

    # check for classification
    if loss in [
        "categorical_crossentropy",
        "sparse_categorical_crossentropy",
        "binary_crossentropy",
    ]:
        is_classifier = True
    else:
        mlist = metrics_from_model(model)
        if isinstance(mlist, (list, np.ndarray)) and any(
            ["accuracy" in m for m in mlist]
        ):
            is_classifier = True
        elif isinstance(mlist, (list, np.ndarray)) and any(["auc" in m for m in mlist]):
            is_classifier = True

    # check for multilabel
    if loss == "binary_crossentropy":
        if is_huggingface(model=model):
            is_multilabel = True
        else:
            last = model.layers[-1]
            output_shape = last.output_shape
            mult_output = (
                True if len(output_shape) == 2 and output_shape[1] > 1 else False
            )
            if (
                (
                    hasattr(last, "activation")
                    and isinstance(last.activation, type(keras.activations.sigmoid))
                )
                or isinstance(last, type(keras.activations.sigmoid))
            ) and mult_output:
                is_multilabel = True
    return (is_classifier, is_multilabel)


def is_tabular_from_data(data):
    return type(data).__name__ in ["TabularDataset"]


def is_huggingface(model=None, data=None):
    """
    check for hugging face transformer model
    from  model and/or data
    """
    huggingface = False
    if model is not None and is_huggingface_from_model(model):
        huggingface = True
    elif data is not None and is_huggingface_from_data(data):
        huggingface = True
    return huggingface


def is_huggingface_from_model(model):
    # 20201202: support both transformers<4.0 and transformers>=4.0
    return "transformers.modeling_tf" in str(
        type(model)
    ) or "transformers.models" in str(type(model))


def is_huggingface_from_data(data):
    return type(data).__name__ in ["TransformerDataset"]


def is_ner(model=None, data=None):
    ner = False
    if data is None:
        warnings.warn("is_ner only detects CRF-based NER models when data is None")
    if model is not None and is_crf(model):
        ner = True
    elif data is not None and is_ner_from_data(data):
        ner = True
    return ner


def is_crf(model):
    """
    checks for CRF sequence tagger.
    """
    # loss = model.loss
    # if callable(loss):
    # if hasattr(loss, '__name__'):
    # loss = loss.__name__
    # elif hasattr(loss, 'name'):
    # loss = loss.name
    # else:
    # raise Exception('could not get loss name')
    # return loss == 'crf_loss' or 'CRF.loss_function' in str(model.loss)
    return type(model.layers[-1]).__name__ == "CRF"


# def is_ner_from_model(model):
# """
# checks for sequence tagger.
# Curently, only checks for a CRF-based sequence tagger
# """
# loss = model.loss
# if callable(loss):
# if hasattr(loss, '__name__'):
# loss = loss.__name__
# elif hasattr(loss, 'name'):
# loss = loss.name
# else:
# raise Exception('could not get loss name')

# return loss == 'crf_loss' or 'CRF.loss_function' in str(model.loss)


def is_ner_from_data(data):
    return type(data).__name__ == "NERSequence"


def is_nodeclass(model=None, data=None):
    result = False
    if data is not None and type(data).__name__ == "NodeSequenceWrapper":
        result = True
    return result


def is_linkpred(model=None, data=None):
    result = False
    if data is not None and type(data).__name__ == "LinkSequenceWrapper":
        result = True
    return result


def is_imageclass_from_data(data):
    return type(data).__name__ in [
        "DirectoryIterator",
        "DataFrameIterator",
        "NumpyArrayIterator",
    ]


def is_regression_from_data(data):
    """
    checks for regression task from data
    """
    data_arg_check(val_data=data, val_required=True)
    if is_ner(data=data):
        return False  # NERSequence
    elif is_nodeclass(data=data):
        return False  # NodeSequenceWrapper
    elif is_linkpred(data=data):
        return False  # LinkSequenceWrapper
    Y = y_from_data(data)
    if len(Y.shape) == 1 or (len(Y.shape) > 1 and Y.shape[1] == 1):
        return True
    return False


def is_multilabel(data):
    """
    checks for multilabel from data
    """
    data_arg_check(val_data=data, val_required=True)
    if is_ner(data=data):
        return False  # NERSequence
    elif is_nodeclass(data=data):
        return False  # NodeSequenceWrapper
    elif is_linkpred(data=data):
        return False  # LinkSequenceWrapper
    multilabel = False
    Y = y_from_data(data)
    if len(Y.shape) == 1 or (len(Y.shape) > 1 and Y.shape[1] == 1):
        return False
    for idx, y in enumerate(Y):
        if idx >= 1024:
            break
        if np.issubdtype(type(y), np.integer) or np.issubdtype(type(y), np.floating):
            return False
        total_for_example = sum(y)
        if total_for_example > 1:
            multilabel = True
            break
    return multilabel


def shape_from_data(data):
    err_msg = "could not determine shape from %s" % (type(data))
    if is_iter(data):
        if is_ktrain_dataset(data):
            return data.xshape()
        elif hasattr(data, "image_shape"):
            return data.image_shape  # DirectoryIterator/DataFrameIterator
        elif hasattr(data, "x"):  # NumpyIterator
            return data.x.shape[1:]
        else:
            try:
                return data[0][0].shape[1:]
            except:
                raise Exception(err_msg)
    else:
        try:
            if type(data[0]) == list:  # BERT-style tuple
                return data[0][0].shape
            else:
                return data[0].shape  # standard tuple
        except:
            raise Exception(err_msg)


def ondisk(data):
    if hasattr(data, "ondisk"):
        return data.ondisk()

    ondisk = is_iter(data) and (type(data).__name__ not in ["NumpyArrayIterator"])
    return ondisk


def nsamples_from_data(data):
    err_msg = "could not determine number of samples from %s" % (type(data))
    if is_iter(data):
        if is_ktrain_dataset(data):
            return data.nsamples()
        elif hasattr(data, "samples"):  # DirectoryIterator/DataFrameIterator
            return data.samples
        elif hasattr(data, "n"):  # DirectoryIterator/DataFrameIterator/NumpyIterator
            return data.n
        else:
            raise Exception(err_msg)
    else:
        try:
            if type(data[0]) == list:  # BERT-style tuple
                return len(data[0][0])
            else:
                return len(data[0])  # standard tuple
        except:
            raise Exception(err_msg)


def nclasses_from_data(data):
    if is_iter(data):
        if is_ktrain_dataset(data):
            return data.nclasses()
        elif hasattr(data, "classes"):  # DirectoryIterator
            return len(set(data.classes))
        else:
            try:
                return data[0][1].shape[1]  # DataFrameIterator/NumpyIterator
            except:
                raise Exception(
                    "could not determine number of classes from %s" % (type(data))
                )
    else:
        try:
            return data[1].shape[1]
        except:
            raise Exception(
                "could not determine number of classes from %s" % (type(data))
            )


def y_from_data(data):
    if is_iter(data):
        if is_ktrain_dataset(data):
            return data.get_y()
        elif hasattr(data, "classes"):  # DirectoryIterator
            return keras.utils.to_categorical(data.classes)
        elif hasattr(data, "labels"):  # DataFrameIterator
            return data.labels
        elif hasattr(data, "y"):  # NumpyArrayIterator
            # return to_categorical(data.y)
            return data.y
        else:
            raise Exception(
                "could not determine number of classes from %s" % (type(data))
            )
    else:
        try:
            return data[1]
        except:
            raise Exception(
                "could not determine number of classes from %s" % (type(data))
            )


def is_iter(data, ignore=False):
    if ignore:
        return True
    iter_classes = ["NumpyArrayIterator", "DirectoryIterator", "DataFrameIterator"]
    return data.__class__.__name__ in iter_classes or is_ktrain_dataset(data)


def data_arg_check(
    train_data=None,
    val_data=None,
    train_required=False,
    val_required=False,
    ndarray_only=False,
):
    if train_required and train_data is None:
        raise ValueError("train_data is required")
    if val_required and val_data is None:
        raise ValueError("val_data is required")
    if train_data is not None and not is_iter(train_data, ndarray_only):
        if bad_data_tuple(train_data):
            err_msg = "data must be tuple of numpy.ndarrays"
            if not ndarray_only:
                err_msg += " or an instance of ktrain.Dataset"
            raise ValueError(err_msg)
    if val_data is not None and not is_iter(val_data, ndarray_only):
        if bad_data_tuple(val_data):
            err_msg = "data must be tuple of numpy.ndarrays or BERT-style tuple"
            if not ndarray_only:
                err_msg += " or an instance of Iterator"
            raise ValueError(err_msg)
    return


def bert_data_tuple(data):
    """
    checks if data tuple is BERT-style format
    """
    if is_iter(data):
        return False
    if (
        type(data[0]) == list
        and len(data[0]) == 2
        and type(data[0][0]) is np.ndarray
        and type(data[0][1]) is np.ndarray
        and type(data[1]) is np.ndarray
        and np.count_nonzero(data[0][1]) == 0
    ):
        return True
    else:
        return False


def bad_data_tuple(data):
    """
    Checks for standard tuple or BERT-style tuple
    """
    if (
        not isinstance(data, tuple)
        or len(data) != 2
        or type(data[0]) not in [np.ndarray, list]
        or (type(data[0]) in [list] and type(data[0][0]) is not np.ndarray)
        or type(data[1]) is not np.ndarray
    ):
        return True
    else:
        return False


# ------------------------------------------------------------------------------
# PLOTTING UTILITIES
# ------------------------------------------------------------------------------


# plots images with labels within jupyter notebook
def plots(ims, figsize=(12, 6), rows=1, interp=False, titles=None):
    # if type(ims[0]) is np.ndarray:
    # ims = np.array(ims).astype(np.uint8)
    # if (ims.shape[-1] != 3):
    # ims = ims.transpose((0,2,3,1))
    f = plt.figure(figsize=figsize)
    cols = len(ims) // rows if len(ims) % 2 == 0 else len(ims) // rows + 1
    for i in range(len(ims)):
        sp = f.add_subplot(rows, cols, i + 1)
        sp.axis("Off")
        if titles is not None:
            sp.set_title(titles[i], fontsize=16)
        plt.imshow(ims[i], interpolation=None if interp else "none")


def plot_confusion_matrix(
    cm, classes, normalize=False, title="Confusion matrix", cmap=plt.cm.Blues
):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation="nearest", cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print("Confusion matrix, without normalization")

    print(cm)

    thresh = cm.max() / 2.0
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(
            j,
            i,
            cm[i, j],
            horizontalalignment="center",
            color="white" if cm[i, j] > thresh else "black",
        )

    plt.tight_layout()
    plt.ylabel("True label")
    plt.xlabel("Predicted label")


# ------------------------------------------------------------------------------
# DOWNLOAD UTILITIES
# ------------------------------------------------------------------------------


def download(url, filename):
    with open(filename, "wb") as f:
        response = requests.get(url, stream=True, verify=False)
        total = response.headers.get("content-length")

        if total is None:
            f.write(response.content)
        else:
            downloaded = 0
            total = int(total)
            # print(total)
            for data in response.iter_content(
                chunk_size=max(int(total / 1000), 1024 * 1024)
            ):
                downloaded += len(data)
                f.write(data)
                done = int(50 * downloaded / total)
                sys.stdout.write("\r[{}{}]".format("█" * done, "." * (50 - done)))
                sys.stdout.flush()


def get_ktrain_data():
    home = os.path.expanduser("~")
    ktrain_data = os.path.join(home, "ktrain_data")
    if not os.path.isdir(ktrain_data):
        os.mkdir(ktrain_data)
    return ktrain_data


# ------------------------------------------------------------------------------
# MISC UTILITIES
# ------------------------------------------------------------------------------

from subprocess import Popen


def checkjava(path=None):
    """
    Checks if a Java executable is available for Tika.
    Args:
        path(str): path to java executable
    Returns:
        True if Java is available, False otherwise
    """

    # Get path to java executable if path not set
    if not path:
        path = os.getenv("TIKA_JAVA", "java")

    # Check if java binary is available on path
    try:
        _ = Popen(path, stdout=open(os.devnull, "w"), stderr=open(os.devnull, "w"))
    except:
        return False
    return True


def batchify(X, size):
    """
    ```
    Splits X into separate batch sizes specified by size.
    Args:
        X(list): elements
        size(int): batch size
    Returns:
        list of evenly sized batches with the last batch having the remaining elements
    ```
    """
    return [X[x : x + size] for x in range(0, len(X), size)]


def list2chunks(a, n):
    k, m = divmod(len(a), n)
    return (a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n))


def check_array(X, y=None, X_name="X", y_name="targets"):
    if not isinstance(X, (list, np.ndarray)):
        raise ValueError("%s must be a list or NumPy array" % X_name)
    if y is not None and not isinstance(y, (list, np.ndarray)):
        raise ValueError("%s must be a list or NumPy array" % y_name)
    return


def is_tf_keras():
    if keras.__name__ == "keras":
        is_tf_keras = False
    elif (
        keras.__name__
        in ["tensorflow.keras", "tensorflow.python.keras", "tensorflow_core.keras"]
        or keras.__version__[-3:] == "-tf"
    ):
        is_tf_keras = True
    else:
        raise KeyError("Cannot detect if using keras or tf.keras.")
    return is_tf_keras


def vprint(s=None, verbose=1):
    if not s:
        s = "\n"
    if verbose:
        print(s)


def add_headers_to_df(fname_in, header_dict, fname_out=None):
    df = pd.read_csv(fname_in, header=None)
    df.rename(columns=header_dict, inplace=True)
    if fname_out is None:
        name, ext = os.path.splitext(fname_in)
        name += "-headers"
        fname_out = name + "." + ext
    df.to_csv(fname_out, index=False)  # save to new csv file
    return


def get_random_colors(n, name="hsv", hex_format=True):
    """Returns a function that maps each index in 0, 1, ..., n-1 to a distinct
    RGB color; the keyword argument name must be a standard mpl colormap name."""
    cmap = plt.cm.get_cmap(name, n)
    result = []
    for i in range(n):
        color = cmap(i)
        if hex_format:
            color = rgb2hex(color)
        result.append(color)
    return np.array(result)


def get_hf_model_name(model_id):
    parts = model_id.split("/")
    if len(parts) == 1:
        model_id = parts[0]
    else:
        model_id = "/".join(parts[1:])
    if model_id.startswith("xlm-roberta"):
        model_name = "xlm-roberta"
    else:
        model_name = model_id.split("-")[0]
    return model_name


# ------------------------------------------------------------------------------
# target-handling
# ------------------------------------------------------------------------------
class YTransform:
    def __init__(self, class_names=[], label_encoder=None):
        """
        ```
        Cheks and transforms array of targets. Targets are transformed in place.
        Args:
          class_names(list):  labels associated with targets (e.g., ['negative', 'positive'])
                         Only used/required if:
                         1. targets are one/multi-hot-encoded
                         2. targets are integers and represent class IDs for classification task
                         Not required if:
                         1. targets are numeric and task is regression
                         2. targets are strings and task is classification (class_names are populated automatically)
          label_encoder(LabelEncoder): a prior instance of LabelEncoder.
                                       If None, will be created when train=True
        ```
        """
        if type(class_names) != list:
            if isinstance(class_names, (pd.Series, np.ndarray)):
                class_names = class_names.tolist()
            else:
                raise ValueError("class_names must be list")
        self.c = class_names
        self.le = label_encoder
        self.train_called = False

    def get_classes(self):
        return self.c

    def set_classes(self, class_names):
        self.c = (
            class_names.tolist() if isinstance(class_names, np.ndarray) else class_names
        )

    def apply(self, targets, train=True):
        if targets is None and train:
            raise ValueError("targets is None")
        elif targets is None and not train:
            return

        # validate labels against data
        targets = np.array(targets) if type(targets) == list else targets
        if len(targets.shape) > 1 and targets.shape[1] == 1:
            targets = np.squeeze(targets, axis=1)

        # handle numeric targets (regression)
        if len(targets.shape) == 1 and not isinstance(targets[0], str):
            # numeric targets
            if not self.get_classes() and train:
                warnings.warn(
                    "Task is being treated as REGRESSION because "
                    + "either class_names argument was not supplied or is_regression=True. "
                    + "If this is incorrect, change accordingly."
                )
            if not self.get_classes():
                targets = np.array(targets, dtype=np.float32)
        # string targets (classification)
        elif len(targets.shape) == 1 and isinstance(targets[0], str):
            if not train and self.le is None:
                raise ValueError(
                    "LabelEncoder has not been trained. Call with train=True"
                )
            if train:
                self.le = LabelEncoder()
                self.le.fit(targets)
                if self.get_classes():
                    warnings.warn(
                        "class_names argument was ignored, as they were extracted from string labels in dataset"
                    )
                self.set_classes(self.le.classes_)
            targets = self.le.transform(
                targets
            )  # convert to numerical targets for classfication
        # handle categorical targets (classification)
        elif len(targets.shape) > 1:
            if not self.get_classes():
                raise ValueError(
                    "targets are 1-hot or multi-hot encoded but class_names is empty. "
                    + "The classes argument should have been supplied."
                )
            else:
                if train and len(self.get_classes()) != targets.shape[1]:
                    raise ValueError(
                        "training targets suggest %s classes, but class_names are %s"
                        % (targets.shape[1], self.get_classes())
                    )

        # numeric targets (classification)
        if len(targets.shape) == 1 and self.get_classes():
            if np.issubdtype(type(max(targets)), np.floating):
                warnings.warn(
                    "class_names implies classification but targets array contains float(s) instead of integers or strings"
                )

            if train and (len(set(targets)) != int(max(targets) + 1)):
                raise ValueError(
                    "len(set(targets) is %s but max(targets)+1 is  %s"
                    % (len(set(targets)), int(max(targets) + 1))
                )
            targets = keras.utils.to_categorical(
                targets, num_classes=len(self.get_classes())
            )
        if train:
            self.train_called = True
        return targets

    def apply_train(self, targets):
        return self.apply(targets, train=True)

    def apply_test(self, targets):
        return self.apply(targets, train=False)


class YTransformDataFrame(YTransform):
    def __init__(self, label_columns=[], is_regression=False):
        """
        ```
        Checks and transforms label columns in DataFrame. DataFrame is modified in place
        Args:
          label_columns(list): list of columns storing labels
          is_regression(bool): If True, task is regression and integer targets are treated as numeric dependent variable.
                               IF False, task is classification and integer targets are treated as class IDs.
        ```
        """
        self.is_regression = is_regression
        if isinstance(label_columns, str):
            label_columns = [label_columns]
        self.label_columns = label_columns
        if not label_columns:
            raise ValueError("label_columns is required")
        self.label_columns = (
            [self.label_columns]
            if isinstance(self.label_columns, str)
            else self.label_columns
        )
        # class_names = label_columns if len(label_columns) > 1 else []
        super().__init__(class_names=[])

    def get_label_columns(self, squeeze=True):
        """
        Returns label columns of transformed DataFrame
        """
        if not self.train_called:
            raise Exception("apply_train should be called first")
        if not self.is_regression:
            new_lab_cols = self.c
        else:
            new_lab_cols = self.label_columns
        return new_lab_cols[0] if len(new_lab_cols) == 1 and squeeze else new_lab_cols

    def apply(self, df, train=True):
        df = (
            df.copy()
        )  # dep_fix: SettingWithCopy - prevent original DataFrame from losing old label columns

        labels_exist = True
        lst = self.label_columns[:]
        if not all(x in df.columns.values for x in lst):
            labels_exist = False
        if train and not labels_exist:
            raise ValueError(
                "dataframe is missing label columns: %s" % (self.label_columns)
            )

        # extract targets
        # todo: sort?
        if len(self.label_columns) > 1:
            if train and self.is_regression:
                warnings.warn(
                    "is_regression=True was supplied but ignored because multiple label columns imply classification"
                )
            cols = df.columns.values
            missing_cols = []
            for l in self.label_columns:
                if l not in df.columns.values:
                    missing_cols.append(l)
            if len(missing_cols) > 0:
                raise ValueError(
                    "These label_columns do not exist in df: %s" % (missing_cols)
                )

            # set targets
            targets = (
                df[self.label_columns].values
                if labels_exist
                else np.zeros((df.shape[0], len(self.label_columns)))
            )
            # set class names
            if train:
                self.set_classes(self.label_columns)
        # single column
        else:
            # set targets
            targets = (
                df[self.label_columns[0]].values
                if labels_exist
                else np.zeros(df.shape[0], dtype=np.int_)
            )
            if self.is_regression and isinstance(targets[0], str):
                warnings.warn(
                    "is_regression=True was supplied but targets are strings - casting to floats"
                )
                targets = targets.astype(np.float64)

            # set class_names if classification task and targets with integer labels
            if train and not self.is_regression and not isinstance(targets[0], str):
                class_names = list(set(targets))
                class_names.sort()
                class_names = list(map(str, class_names))
                if len(class_names) == 2:
                    class_names = [
                        "not_" + self.label_columns[0],
                        self.label_columns[0],
                    ]
                else:
                    class_names = [self.label_columns[0] + "_" + c for c in class_names]
                self.set_classes(class_names)

        # transform targets
        targets = super().apply(
            targets, train=train
        )  # self.c (new label_columns) may be modified here
        targets = (
            targets if len(targets.shape) > 1 else np.expand_dims(targets, 1)
        )  # since self.label_columns is list

        # modify DataFrame
        if labels_exist:
            for l in self.label_columns:
                del df[l]  # delete old label columns

        new_lab_cols = self.get_label_columns(squeeze=False)
        if len(new_lab_cols) != targets.shape[1]:
            raise ValueError(
                "mismatch between target shape and number of labels - please open ktrain GitHub issue"
            )
        for i, col in enumerate(new_lab_cols):
            df[col] = targets[:, i]
        df[new_lab_cols] = targets
        print(new_lab_cols)
        print(df[new_lab_cols].head())
        df[new_lab_cols] = df[new_lab_cols].astype("float32")

        return df

    def apply_train(self, df):
        return self.apply(df, train=True)

    def apply_test(self, df):
        return self.apply(df, train=False)

Functions

def add_headers_to_df(fname_in, header_dict, fname_out=None)
Expand source code
def add_headers_to_df(fname_in, header_dict, fname_out=None):
    df = pd.read_csv(fname_in, header=None)
    df.rename(columns=header_dict, inplace=True)
    if fname_out is None:
        name, ext = os.path.splitext(fname_in)
        name += "-headers"
        fname_out = name + "." + ext
    df.to_csv(fname_out, index=False)  # save to new csv file
    return
def bad_data_tuple(data)

Checks for standard tuple or BERT-style tuple

Expand source code
def bad_data_tuple(data):
    """
    Checks for standard tuple or BERT-style tuple
    """
    if (
        not isinstance(data, tuple)
        or len(data) != 2
        or type(data[0]) not in [np.ndarray, list]
        or (type(data[0]) in [list] and type(data[0][0]) is not np.ndarray)
        or type(data[1]) is not np.ndarray
    ):
        return True
    else:
        return False
def batchify(X, size)
Splits X into separate batch sizes specified by size.
Args:
    X(list): elements
    size(int): batch size
Returns:
    list of evenly sized batches with the last batch having the remaining elements
Expand source code
def batchify(X, size):
    """
    ```
    Splits X into separate batch sizes specified by size.
    Args:
        X(list): elements
        size(int): batch size
    Returns:
        list of evenly sized batches with the last batch having the remaining elements
    ```
    """
    return [X[x : x + size] for x in range(0, len(X), size)]
def bert_data_tuple(data)

checks if data tuple is BERT-style format

Expand source code
def bert_data_tuple(data):
    """
    checks if data tuple is BERT-style format
    """
    if is_iter(data):
        return False
    if (
        type(data[0]) == list
        and len(data[0]) == 2
        and type(data[0][0]) is np.ndarray
        and type(data[0][1]) is np.ndarray
        and type(data[1]) is np.ndarray
        and np.count_nonzero(data[0][1]) == 0
    ):
        return True
    else:
        return False
def check_array(X, y=None, X_name='X', y_name='targets')
Expand source code
def check_array(X, y=None, X_name="X", y_name="targets"):
    if not isinstance(X, (list, np.ndarray)):
        raise ValueError("%s must be a list or NumPy array" % X_name)
    if y is not None and not isinstance(y, (list, np.ndarray)):
        raise ValueError("%s must be a list or NumPy array" % y_name)
    return
def checkjava(path=None)

Checks if a Java executable is available for Tika.

Args

path(str): path to java executable

Returns

True if Java is available, False otherwise

Expand source code
def checkjava(path=None):
    """
    Checks if a Java executable is available for Tika.
    Args:
        path(str): path to java executable
    Returns:
        True if Java is available, False otherwise
    """

    # Get path to java executable if path not set
    if not path:
        path = os.getenv("TIKA_JAVA", "java")

    # Check if java binary is available on path
    try:
        _ = Popen(path, stdout=open(os.devnull, "w"), stderr=open(os.devnull, "w"))
    except:
        return False
    return True
def data_arg_check(train_data=None, val_data=None, train_required=False, val_required=False, ndarray_only=False)
Expand source code
def data_arg_check(
    train_data=None,
    val_data=None,
    train_required=False,
    val_required=False,
    ndarray_only=False,
):
    if train_required and train_data is None:
        raise ValueError("train_data is required")
    if val_required and val_data is None:
        raise ValueError("val_data is required")
    if train_data is not None and not is_iter(train_data, ndarray_only):
        if bad_data_tuple(train_data):
            err_msg = "data must be tuple of numpy.ndarrays"
            if not ndarray_only:
                err_msg += " or an instance of ktrain.Dataset"
            raise ValueError(err_msg)
    if val_data is not None and not is_iter(val_data, ndarray_only):
        if bad_data_tuple(val_data):
            err_msg = "data must be tuple of numpy.ndarrays or BERT-style tuple"
            if not ndarray_only:
                err_msg += " or an instance of Iterator"
            raise ValueError(err_msg)
    return
def download(url, filename)
Expand source code
def download(url, filename):
    with open(filename, "wb") as f:
        response = requests.get(url, stream=True, verify=False)
        total = response.headers.get("content-length")

        if total is None:
            f.write(response.content)
        else:
            downloaded = 0
            total = int(total)
            # print(total)
            for data in response.iter_content(
                chunk_size=max(int(total / 1000), 1024 * 1024)
            ):
                downloaded += len(data)
                f.write(data)
                done = int(50 * downloaded / total)
                sys.stdout.write("\r[{}{}]".format("█" * done, "." * (50 - done)))
                sys.stdout.flush()
def get_default_optimizer(lr=0.001, wd=0.01)
Expand source code
def get_default_optimizer(lr=0.001, wd=DEFAULT_WD):
    from .lroptimize.optimization import AdamWeightDecay

    opt = AdamWeightDecay(
        learning_rate=lr,
        weight_decay_rate=wd,
        beta_1=0.9,
        beta_2=0.999,
        epsilon=1e-6,
        exclude_from_weight_decay=["layer_norm", "bias"],
    )
    return opt
def get_hf_model_name(model_id)
Expand source code
def get_hf_model_name(model_id):
    parts = model_id.split("/")
    if len(parts) == 1:
        model_id = parts[0]
    else:
        model_id = "/".join(parts[1:])
    if model_id.startswith("xlm-roberta"):
        model_name = "xlm-roberta"
    else:
        model_name = model_id.split("-")[0]
    return model_name
def get_ktrain_data()
Expand source code
def get_ktrain_data():
    home = os.path.expanduser("~")
    ktrain_data = os.path.join(home, "ktrain_data")
    if not os.path.isdir(ktrain_data):
        os.mkdir(ktrain_data)
    return ktrain_data
def get_random_colors(n, name='hsv', hex_format=True)

Returns a function that maps each index in 0, 1, …, n-1 to a distinct RGB color; the keyword argument name must be a standard mpl colormap name.

Expand source code
def get_random_colors(n, name="hsv", hex_format=True):
    """Returns a function that maps each index in 0, 1, ..., n-1 to a distinct
    RGB color; the keyword argument name must be a standard mpl colormap name."""
    cmap = plt.cm.get_cmap(name, n)
    result = []
    for i in range(n):
        color = cmap(i)
        if hex_format:
            color = rgb2hex(color)
        result.append(color)
    return np.array(result)
def is_classifier(model)

checks for classification and mutlilabel from model

Expand source code
def is_classifier(model):
    """
    checks for classification and mutlilabel from model
    """
    is_classifier = False
    is_multilabel = False

    # get loss name
    loss = model.loss
    if callable(loss):
        if hasattr(loss, "__name__"):
            loss = loss.__name__
        elif hasattr(loss, "name"):
            loss = loss.name
        else:
            raise Exception("could not get loss name")

    # check for classification
    if loss in [
        "categorical_crossentropy",
        "sparse_categorical_crossentropy",
        "binary_crossentropy",
    ]:
        is_classifier = True
    else:
        mlist = metrics_from_model(model)
        if isinstance(mlist, (list, np.ndarray)) and any(
            ["accuracy" in m for m in mlist]
        ):
            is_classifier = True
        elif isinstance(mlist, (list, np.ndarray)) and any(["auc" in m for m in mlist]):
            is_classifier = True

    # check for multilabel
    if loss == "binary_crossentropy":
        if is_huggingface(model=model):
            is_multilabel = True
        else:
            last = model.layers[-1]
            output_shape = last.output_shape
            mult_output = (
                True if len(output_shape) == 2 and output_shape[1] > 1 else False
            )
            if (
                (
                    hasattr(last, "activation")
                    and isinstance(last.activation, type(keras.activations.sigmoid))
                )
                or isinstance(last, type(keras.activations.sigmoid))
            ) and mult_output:
                is_multilabel = True
    return (is_classifier, is_multilabel)
def is_crf(model)

checks for CRF sequence tagger.

Expand source code
def is_crf(model):
    """
    checks for CRF sequence tagger.
    """
    # loss = model.loss
    # if callable(loss):
    # if hasattr(loss, '__name__'):
    # loss = loss.__name__
    # elif hasattr(loss, 'name'):
    # loss = loss.name
    # else:
    # raise Exception('could not get loss name')
    # return loss == 'crf_loss' or 'CRF.loss_function' in str(model.loss)
    return type(model.layers[-1]).__name__ == "CRF"
def is_huggingface(model=None, data=None)

check for hugging face transformer model from model and/or data

Expand source code
def is_huggingface(model=None, data=None):
    """
    check for hugging face transformer model
    from  model and/or data
    """
    huggingface = False
    if model is not None and is_huggingface_from_model(model):
        huggingface = True
    elif data is not None and is_huggingface_from_data(data):
        huggingface = True
    return huggingface
def is_huggingface_from_data(data)
Expand source code
def is_huggingface_from_data(data):
    return type(data).__name__ in ["TransformerDataset"]
def is_huggingface_from_model(model)
Expand source code
def is_huggingface_from_model(model):
    # 20201202: support both transformers<4.0 and transformers>=4.0
    return "transformers.modeling_tf" in str(
        type(model)
    ) or "transformers.models" in str(type(model))
def is_imageclass_from_data(data)
Expand source code
def is_imageclass_from_data(data):
    return type(data).__name__ in [
        "DirectoryIterator",
        "DataFrameIterator",
        "NumpyArrayIterator",
    ]
def is_iter(data, ignore=False)
Expand source code
def is_iter(data, ignore=False):
    if ignore:
        return True
    iter_classes = ["NumpyArrayIterator", "DirectoryIterator", "DataFrameIterator"]
    return data.__class__.__name__ in iter_classes or is_ktrain_dataset(data)
def is_ktrain_dataset(data)
Expand source code
def is_ktrain_dataset(data):
    from .dataset import Dataset

    return isinstance(data, Dataset)
def is_linkpred(model=None, data=None)
Expand source code
def is_linkpred(model=None, data=None):
    result = False
    if data is not None and type(data).__name__ == "LinkSequenceWrapper":
        result = True
    return result
def is_multilabel(data)

checks for multilabel from data

Expand source code
def is_multilabel(data):
    """
    checks for multilabel from data
    """
    data_arg_check(val_data=data, val_required=True)
    if is_ner(data=data):
        return False  # NERSequence
    elif is_nodeclass(data=data):
        return False  # NodeSequenceWrapper
    elif is_linkpred(data=data):
        return False  # LinkSequenceWrapper
    multilabel = False
    Y = y_from_data(data)
    if len(Y.shape) == 1 or (len(Y.shape) > 1 and Y.shape[1] == 1):
        return False
    for idx, y in enumerate(Y):
        if idx >= 1024:
            break
        if np.issubdtype(type(y), np.integer) or np.issubdtype(type(y), np.floating):
            return False
        total_for_example = sum(y)
        if total_for_example > 1:
            multilabel = True
            break
    return multilabel
def is_ner(model=None, data=None)
Expand source code
def is_ner(model=None, data=None):
    ner = False
    if data is None:
        warnings.warn("is_ner only detects CRF-based NER models when data is None")
    if model is not None and is_crf(model):
        ner = True
    elif data is not None and is_ner_from_data(data):
        ner = True
    return ner
def is_ner_from_data(data)
Expand source code
def is_ner_from_data(data):
    return type(data).__name__ == "NERSequence"
def is_nodeclass(model=None, data=None)
Expand source code
def is_nodeclass(model=None, data=None):
    result = False
    if data is not None and type(data).__name__ == "NodeSequenceWrapper":
        result = True
    return result
def is_regression_from_data(data)

checks for regression task from data

Expand source code
def is_regression_from_data(data):
    """
    checks for regression task from data
    """
    data_arg_check(val_data=data, val_required=True)
    if is_ner(data=data):
        return False  # NERSequence
    elif is_nodeclass(data=data):
        return False  # NodeSequenceWrapper
    elif is_linkpred(data=data):
        return False  # LinkSequenceWrapper
    Y = y_from_data(data)
    if len(Y.shape) == 1 or (len(Y.shape) > 1 and Y.shape[1] == 1):
        return True
    return False
def is_tabular_from_data(data)
Expand source code
def is_tabular_from_data(data):
    return type(data).__name__ in ["TabularDataset"]
def is_tf_keras()
Expand source code
def is_tf_keras():
    if keras.__name__ == "keras":
        is_tf_keras = False
    elif (
        keras.__name__
        in ["tensorflow.keras", "tensorflow.python.keras", "tensorflow_core.keras"]
        or keras.__version__[-3:] == "-tf"
    ):
        is_tf_keras = True
    else:
        raise KeyError("Cannot detect if using keras or tf.keras.")
    return is_tf_keras
def list2chunks(a, n)
Expand source code
def list2chunks(a, n):
    k, m = divmod(len(a), n)
    return (a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n))
def loss_fn_from_model(model)
Expand source code
def loss_fn_from_model(model):
    # dep_fix
    if version.parse(tf.__version__) < version.parse("2.2") or DISABLE_V2_BEHAVIOR:
        return model.loss_functions[0].fn
    else:  # TF 2.2.0
        return model.compiled_loss._get_loss_object(
            model.compiled_loss._losses[0].name
        ).fn
def metrics_from_model(model)
Expand source code
def metrics_from_model(model):
    msg = "Could not retrieve metrics list from compiled model"

    # dep_fix
    if version.parse(tf.__version__) < version.parse("2.2") or DISABLE_V2_BEHAVIOR:
        return model._compile_metrics
        # return [m.name for m in model.metrics] if is_tf_keras() else model.metrics
    else:  # TF >= 2.2.0
        mlist = model.compiled_metrics._metrics
        if isinstance(mlist, list) and isinstance(
            mlist[0], str
        ):  # metrics are strings prior to training
            return mlist
        elif isinstance(mlist, list) and isinstance(mlist[0], list):
            try:
                return [m.name for m in mlist[0]]
            except:
                warnings.warn(msg)
                return []
        elif isinstance(mlist, list) and hasattr(
            mlist[0], "name"
        ):  # tf.keras.metrics.AUC()
            try:
                return [m.name for m in mlist]
            except:
                warnings.warn(msg)
                return []

        else:
            warnings.warn(msg)
            return []
def nclasses_from_data(data)
Expand source code
def nclasses_from_data(data):
    if is_iter(data):
        if is_ktrain_dataset(data):
            return data.nclasses()
        elif hasattr(data, "classes"):  # DirectoryIterator
            return len(set(data.classes))
        else:
            try:
                return data[0][1].shape[1]  # DataFrameIterator/NumpyIterator
            except:
                raise Exception(
                    "could not determine number of classes from %s" % (type(data))
                )
    else:
        try:
            return data[1].shape[1]
        except:
            raise Exception(
                "could not determine number of classes from %s" % (type(data))
            )
def nsamples_from_data(data)
Expand source code
def nsamples_from_data(data):
    err_msg = "could not determine number of samples from %s" % (type(data))
    if is_iter(data):
        if is_ktrain_dataset(data):
            return data.nsamples()
        elif hasattr(data, "samples"):  # DirectoryIterator/DataFrameIterator
            return data.samples
        elif hasattr(data, "n"):  # DirectoryIterator/DataFrameIterator/NumpyIterator
            return data.n
        else:
            raise Exception(err_msg)
    else:
        try:
            if type(data[0]) == list:  # BERT-style tuple
                return len(data[0][0])
            else:
                return len(data[0])  # standard tuple
        except:
            raise Exception(err_msg)
def ondisk(data)
Expand source code
def ondisk(data):
    if hasattr(data, "ondisk"):
        return data.ondisk()

    ondisk = is_iter(data) and (type(data).__name__ not in ["NumpyArrayIterator"])
    return ondisk
def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=<matplotlib.colors.LinearSegmentedColormap object>)

This function prints and plots the confusion matrix. Normalization can be applied by setting normalize=True.

Expand source code
def plot_confusion_matrix(
    cm, classes, normalize=False, title="Confusion matrix", cmap=plt.cm.Blues
):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation="nearest", cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print("Confusion matrix, without normalization")

    print(cm)

    thresh = cm.max() / 2.0
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(
            j,
            i,
            cm[i, j],
            horizontalalignment="center",
            color="white" if cm[i, j] > thresh else "black",
        )

    plt.tight_layout()
    plt.ylabel("True label")
    plt.xlabel("Predicted label")
def plots(ims, figsize=(12, 6), rows=1, interp=False, titles=None)
Expand source code
def plots(ims, figsize=(12, 6), rows=1, interp=False, titles=None):
    # if type(ims[0]) is np.ndarray:
    # ims = np.array(ims).astype(np.uint8)
    # if (ims.shape[-1] != 3):
    # ims = ims.transpose((0,2,3,1))
    f = plt.figure(figsize=figsize)
    cols = len(ims) // rows if len(ims) % 2 == 0 else len(ims) // rows + 1
    for i in range(len(ims)):
        sp = f.add_subplot(rows, cols, i + 1)
        sp.axis("Off")
        if titles is not None:
            sp.set_title(titles[i], fontsize=16)
        plt.imshow(ims[i], interpolation=None if interp else "none")
def shape_from_data(data)
Expand source code
def shape_from_data(data):
    err_msg = "could not determine shape from %s" % (type(data))
    if is_iter(data):
        if is_ktrain_dataset(data):
            return data.xshape()
        elif hasattr(data, "image_shape"):
            return data.image_shape  # DirectoryIterator/DataFrameIterator
        elif hasattr(data, "x"):  # NumpyIterator
            return data.x.shape[1:]
        else:
            try:
                return data[0][0].shape[1:]
            except:
                raise Exception(err_msg)
    else:
        try:
            if type(data[0]) == list:  # BERT-style tuple
                return data[0][0].shape
            else:
                return data[0].shape  # standard tuple
        except:
            raise Exception(err_msg)
def vprint(s=None, verbose=1)
Expand source code
def vprint(s=None, verbose=1):
    if not s:
        s = "\n"
    if verbose:
        print(s)
def y_from_data(data)
Expand source code
def y_from_data(data):
    if is_iter(data):
        if is_ktrain_dataset(data):
            return data.get_y()
        elif hasattr(data, "classes"):  # DirectoryIterator
            return keras.utils.to_categorical(data.classes)
        elif hasattr(data, "labels"):  # DataFrameIterator
            return data.labels
        elif hasattr(data, "y"):  # NumpyArrayIterator
            # return to_categorical(data.y)
            return data.y
        else:
            raise Exception(
                "could not determine number of classes from %s" % (type(data))
            )
    else:
        try:
            return data[1]
        except:
            raise Exception(
                "could not determine number of classes from %s" % (type(data))
            )

Classes

class YTransform (class_names=[], label_encoder=None)
Cheks and transforms array of targets. Targets are transformed in place.
Args:
  class_names(list):  labels associated with targets (e.g., ['negative', 'positive'])
                 Only used/required if:
                 1. targets are one/multi-hot-encoded
                 2. targets are integers and represent class IDs for classification task
                 Not required if:
                 1. targets are numeric and task is regression
                 2. targets are strings and task is classification (class_names are populated automatically)
  label_encoder(LabelEncoder): a prior instance of LabelEncoder.
                               If None, will be created when train=True
Expand source code
class YTransform:
    def __init__(self, class_names=[], label_encoder=None):
        """
        ```
        Cheks and transforms array of targets. Targets are transformed in place.
        Args:
          class_names(list):  labels associated with targets (e.g., ['negative', 'positive'])
                         Only used/required if:
                         1. targets are one/multi-hot-encoded
                         2. targets are integers and represent class IDs for classification task
                         Not required if:
                         1. targets are numeric and task is regression
                         2. targets are strings and task is classification (class_names are populated automatically)
          label_encoder(LabelEncoder): a prior instance of LabelEncoder.
                                       If None, will be created when train=True
        ```
        """
        if type(class_names) != list:
            if isinstance(class_names, (pd.Series, np.ndarray)):
                class_names = class_names.tolist()
            else:
                raise ValueError("class_names must be list")
        self.c = class_names
        self.le = label_encoder
        self.train_called = False

    def get_classes(self):
        return self.c

    def set_classes(self, class_names):
        self.c = (
            class_names.tolist() if isinstance(class_names, np.ndarray) else class_names
        )

    def apply(self, targets, train=True):
        if targets is None and train:
            raise ValueError("targets is None")
        elif targets is None and not train:
            return

        # validate labels against data
        targets = np.array(targets) if type(targets) == list else targets
        if len(targets.shape) > 1 and targets.shape[1] == 1:
            targets = np.squeeze(targets, axis=1)

        # handle numeric targets (regression)
        if len(targets.shape) == 1 and not isinstance(targets[0], str):
            # numeric targets
            if not self.get_classes() and train:
                warnings.warn(
                    "Task is being treated as REGRESSION because "
                    + "either class_names argument was not supplied or is_regression=True. "
                    + "If this is incorrect, change accordingly."
                )
            if not self.get_classes():
                targets = np.array(targets, dtype=np.float32)
        # string targets (classification)
        elif len(targets.shape) == 1 and isinstance(targets[0], str):
            if not train and self.le is None:
                raise ValueError(
                    "LabelEncoder has not been trained. Call with train=True"
                )
            if train:
                self.le = LabelEncoder()
                self.le.fit(targets)
                if self.get_classes():
                    warnings.warn(
                        "class_names argument was ignored, as they were extracted from string labels in dataset"
                    )
                self.set_classes(self.le.classes_)
            targets = self.le.transform(
                targets
            )  # convert to numerical targets for classfication
        # handle categorical targets (classification)
        elif len(targets.shape) > 1:
            if not self.get_classes():
                raise ValueError(
                    "targets are 1-hot or multi-hot encoded but class_names is empty. "
                    + "The classes argument should have been supplied."
                )
            else:
                if train and len(self.get_classes()) != targets.shape[1]:
                    raise ValueError(
                        "training targets suggest %s classes, but class_names are %s"
                        % (targets.shape[1], self.get_classes())
                    )

        # numeric targets (classification)
        if len(targets.shape) == 1 and self.get_classes():
            if np.issubdtype(type(max(targets)), np.floating):
                warnings.warn(
                    "class_names implies classification but targets array contains float(s) instead of integers or strings"
                )

            if train and (len(set(targets)) != int(max(targets) + 1)):
                raise ValueError(
                    "len(set(targets) is %s but max(targets)+1 is  %s"
                    % (len(set(targets)), int(max(targets) + 1))
                )
            targets = keras.utils.to_categorical(
                targets, num_classes=len(self.get_classes())
            )
        if train:
            self.train_called = True
        return targets

    def apply_train(self, targets):
        return self.apply(targets, train=True)

    def apply_test(self, targets):
        return self.apply(targets, train=False)

Subclasses

Methods

def apply(self, targets, train=True)
Expand source code
def apply(self, targets, train=True):
    if targets is None and train:
        raise ValueError("targets is None")
    elif targets is None and not train:
        return

    # validate labels against data
    targets = np.array(targets) if type(targets) == list else targets
    if len(targets.shape) > 1 and targets.shape[1] == 1:
        targets = np.squeeze(targets, axis=1)

    # handle numeric targets (regression)
    if len(targets.shape) == 1 and not isinstance(targets[0], str):
        # numeric targets
        if not self.get_classes() and train:
            warnings.warn(
                "Task is being treated as REGRESSION because "
                + "either class_names argument was not supplied or is_regression=True. "
                + "If this is incorrect, change accordingly."
            )
        if not self.get_classes():
            targets = np.array(targets, dtype=np.float32)
    # string targets (classification)
    elif len(targets.shape) == 1 and isinstance(targets[0], str):
        if not train and self.le is None:
            raise ValueError(
                "LabelEncoder has not been trained. Call with train=True"
            )
        if train:
            self.le = LabelEncoder()
            self.le.fit(targets)
            if self.get_classes():
                warnings.warn(
                    "class_names argument was ignored, as they were extracted from string labels in dataset"
                )
            self.set_classes(self.le.classes_)
        targets = self.le.transform(
            targets
        )  # convert to numerical targets for classfication
    # handle categorical targets (classification)
    elif len(targets.shape) > 1:
        if not self.get_classes():
            raise ValueError(
                "targets are 1-hot or multi-hot encoded but class_names is empty. "
                + "The classes argument should have been supplied."
            )
        else:
            if train and len(self.get_classes()) != targets.shape[1]:
                raise ValueError(
                    "training targets suggest %s classes, but class_names are %s"
                    % (targets.shape[1], self.get_classes())
                )

    # numeric targets (classification)
    if len(targets.shape) == 1 and self.get_classes():
        if np.issubdtype(type(max(targets)), np.floating):
            warnings.warn(
                "class_names implies classification but targets array contains float(s) instead of integers or strings"
            )

        if train and (len(set(targets)) != int(max(targets) + 1)):
            raise ValueError(
                "len(set(targets) is %s but max(targets)+1 is  %s"
                % (len(set(targets)), int(max(targets) + 1))
            )
        targets = keras.utils.to_categorical(
            targets, num_classes=len(self.get_classes())
        )
    if train:
        self.train_called = True
    return targets
def apply_test(self, targets)
Expand source code
def apply_test(self, targets):
    return self.apply(targets, train=False)
def apply_train(self, targets)
Expand source code
def apply_train(self, targets):
    return self.apply(targets, train=True)
def get_classes(self)
Expand source code
def get_classes(self):
    return self.c
def set_classes(self, class_names)
Expand source code
def set_classes(self, class_names):
    self.c = (
        class_names.tolist() if isinstance(class_names, np.ndarray) else class_names
    )
class YTransformDataFrame (label_columns=[], is_regression=False)
Checks and transforms label columns in DataFrame. DataFrame is modified in place
Args:
  label_columns(list): list of columns storing labels
  is_regression(bool): If True, task is regression and integer targets are treated as numeric dependent variable.
                       IF False, task is classification and integer targets are treated as class IDs.
Expand source code
class YTransformDataFrame(YTransform):
    def __init__(self, label_columns=[], is_regression=False):
        """
        ```
        Checks and transforms label columns in DataFrame. DataFrame is modified in place
        Args:
          label_columns(list): list of columns storing labels
          is_regression(bool): If True, task is regression and integer targets are treated as numeric dependent variable.
                               IF False, task is classification and integer targets are treated as class IDs.
        ```
        """
        self.is_regression = is_regression
        if isinstance(label_columns, str):
            label_columns = [label_columns]
        self.label_columns = label_columns
        if not label_columns:
            raise ValueError("label_columns is required")
        self.label_columns = (
            [self.label_columns]
            if isinstance(self.label_columns, str)
            else self.label_columns
        )
        # class_names = label_columns if len(label_columns) > 1 else []
        super().__init__(class_names=[])

    def get_label_columns(self, squeeze=True):
        """
        Returns label columns of transformed DataFrame
        """
        if not self.train_called:
            raise Exception("apply_train should be called first")
        if not self.is_regression:
            new_lab_cols = self.c
        else:
            new_lab_cols = self.label_columns
        return new_lab_cols[0] if len(new_lab_cols) == 1 and squeeze else new_lab_cols

    def apply(self, df, train=True):
        df = (
            df.copy()
        )  # dep_fix: SettingWithCopy - prevent original DataFrame from losing old label columns

        labels_exist = True
        lst = self.label_columns[:]
        if not all(x in df.columns.values for x in lst):
            labels_exist = False
        if train and not labels_exist:
            raise ValueError(
                "dataframe is missing label columns: %s" % (self.label_columns)
            )

        # extract targets
        # todo: sort?
        if len(self.label_columns) > 1:
            if train and self.is_regression:
                warnings.warn(
                    "is_regression=True was supplied but ignored because multiple label columns imply classification"
                )
            cols = df.columns.values
            missing_cols = []
            for l in self.label_columns:
                if l not in df.columns.values:
                    missing_cols.append(l)
            if len(missing_cols) > 0:
                raise ValueError(
                    "These label_columns do not exist in df: %s" % (missing_cols)
                )

            # set targets
            targets = (
                df[self.label_columns].values
                if labels_exist
                else np.zeros((df.shape[0], len(self.label_columns)))
            )
            # set class names
            if train:
                self.set_classes(self.label_columns)
        # single column
        else:
            # set targets
            targets = (
                df[self.label_columns[0]].values
                if labels_exist
                else np.zeros(df.shape[0], dtype=np.int_)
            )
            if self.is_regression and isinstance(targets[0], str):
                warnings.warn(
                    "is_regression=True was supplied but targets are strings - casting to floats"
                )
                targets = targets.astype(np.float64)

            # set class_names if classification task and targets with integer labels
            if train and not self.is_regression and not isinstance(targets[0], str):
                class_names = list(set(targets))
                class_names.sort()
                class_names = list(map(str, class_names))
                if len(class_names) == 2:
                    class_names = [
                        "not_" + self.label_columns[0],
                        self.label_columns[0],
                    ]
                else:
                    class_names = [self.label_columns[0] + "_" + c for c in class_names]
                self.set_classes(class_names)

        # transform targets
        targets = super().apply(
            targets, train=train
        )  # self.c (new label_columns) may be modified here
        targets = (
            targets if len(targets.shape) > 1 else np.expand_dims(targets, 1)
        )  # since self.label_columns is list

        # modify DataFrame
        if labels_exist:
            for l in self.label_columns:
                del df[l]  # delete old label columns

        new_lab_cols = self.get_label_columns(squeeze=False)
        if len(new_lab_cols) != targets.shape[1]:
            raise ValueError(
                "mismatch between target shape and number of labels - please open ktrain GitHub issue"
            )
        for i, col in enumerate(new_lab_cols):
            df[col] = targets[:, i]
        df[new_lab_cols] = targets
        print(new_lab_cols)
        print(df[new_lab_cols].head())
        df[new_lab_cols] = df[new_lab_cols].astype("float32")

        return df

    def apply_train(self, df):
        return self.apply(df, train=True)

    def apply_test(self, df):
        return self.apply(df, train=False)

Ancestors

Methods

def apply(self, df, train=True)
Expand source code
def apply(self, df, train=True):
    df = (
        df.copy()
    )  # dep_fix: SettingWithCopy - prevent original DataFrame from losing old label columns

    labels_exist = True
    lst = self.label_columns[:]
    if not all(x in df.columns.values for x in lst):
        labels_exist = False
    if train and not labels_exist:
        raise ValueError(
            "dataframe is missing label columns: %s" % (self.label_columns)
        )

    # extract targets
    # todo: sort?
    if len(self.label_columns) > 1:
        if train and self.is_regression:
            warnings.warn(
                "is_regression=True was supplied but ignored because multiple label columns imply classification"
            )
        cols = df.columns.values
        missing_cols = []
        for l in self.label_columns:
            if l not in df.columns.values:
                missing_cols.append(l)
        if len(missing_cols) > 0:
            raise ValueError(
                "These label_columns do not exist in df: %s" % (missing_cols)
            )

        # set targets
        targets = (
            df[self.label_columns].values
            if labels_exist
            else np.zeros((df.shape[0], len(self.label_columns)))
        )
        # set class names
        if train:
            self.set_classes(self.label_columns)
    # single column
    else:
        # set targets
        targets = (
            df[self.label_columns[0]].values
            if labels_exist
            else np.zeros(df.shape[0], dtype=np.int_)
        )
        if self.is_regression and isinstance(targets[0], str):
            warnings.warn(
                "is_regression=True was supplied but targets are strings - casting to floats"
            )
            targets = targets.astype(np.float64)

        # set class_names if classification task and targets with integer labels
        if train and not self.is_regression and not isinstance(targets[0], str):
            class_names = list(set(targets))
            class_names.sort()
            class_names = list(map(str, class_names))
            if len(class_names) == 2:
                class_names = [
                    "not_" + self.label_columns[0],
                    self.label_columns[0],
                ]
            else:
                class_names = [self.label_columns[0] + "_" + c for c in class_names]
            self.set_classes(class_names)

    # transform targets
    targets = super().apply(
        targets, train=train
    )  # self.c (new label_columns) may be modified here
    targets = (
        targets if len(targets.shape) > 1 else np.expand_dims(targets, 1)
    )  # since self.label_columns is list

    # modify DataFrame
    if labels_exist:
        for l in self.label_columns:
            del df[l]  # delete old label columns

    new_lab_cols = self.get_label_columns(squeeze=False)
    if len(new_lab_cols) != targets.shape[1]:
        raise ValueError(
            "mismatch between target shape and number of labels - please open ktrain GitHub issue"
        )
    for i, col in enumerate(new_lab_cols):
        df[col] = targets[:, i]
    df[new_lab_cols] = targets
    print(new_lab_cols)
    print(df[new_lab_cols].head())
    df[new_lab_cols] = df[new_lab_cols].astype("float32")

    return df
def apply_test(self, df)
Expand source code
def apply_test(self, df):
    return self.apply(df, train=False)
def apply_train(self, df)
Expand source code
def apply_train(self, df):
    return self.apply(df, train=True)
def get_label_columns(self, squeeze=True)

Returns label columns of transformed DataFrame

Expand source code
def get_label_columns(self, squeeze=True):
    """
    Returns label columns of transformed DataFrame
    """
    if not self.train_called:
        raise Exception("apply_train should be called first")
    if not self.is_regression:
        new_lab_cols = self.c
    else:
        new_lab_cols = self.label_columns
    return new_lab_cols[0] if len(new_lab_cols) == 1 and squeeze else new_lab_cols