Module ktrain.text.shallownlp.utils

Expand source code
#
# The ShallowNLP is kept self-contained for now.
# Thus, some or all of the functions here are copied from
# ktrain.text.textutils

from .imports import *


def extract_filenames(corpus_path, follow_links=False):
    if os.listdir(corpus_path) == []:
        raise ValueError("%s: path is empty" % corpus_path)
    for root, _, fnames in os.walk(corpus_path, followlinks=follow_links):
        for filename in fnames:
            try:
                yield os.path.join(root, filename)
            except Exception:
                continue


def detect_lang(texts, sample_size=32):
    """
    detect language
    """
    if not LANGDETECT:
        raise ValueError("langdetect is missing - install with pip install langdetect")

    if isinstance(texts, str):
        texts = [texts]
    if not isinstance(texts, (list, np.ndarray)):
        raise ValueError("texts must be a list or NumPy array of strings")
    lst = []
    for doc in texts[:sample_size]:
        try:
            lst.append(langdetect.detect(doc))
        except:
            continue
    if len(lst) == 0:
        raise Exception(
            "could not detect language in random sample of %s docs." % (sample_size)
        )
    return max(set(lst), key=lst.count)


def is_chinese(lang):
    """
    include additional languages due to mistakes on short texts by langdetect
    """
    return lang is not None and lang.startswith("zh-") or lang in ["ja", "ko"]


def split_chinese(texts):
    if not JIEBA:
        raise ValueError("jieba is missing - install with pip install jieba")
    if isinstance(texts, str):
        texts = [texts]

    split_texts = []
    for doc in texts:
        seg_list = jieba.cut(doc, cut_all=False)
        seg_list = list(seg_list)
        split_texts.append(seg_list)
    return [" ".join(tokens) for tokens in split_texts]


def decode_by_line(texts, encoding="utf-8", verbose=1):
    """
    Decode text line by line and skip over errors.
    """
    if isinstance(texts, str):
        texts = [texts]
    new_texts = []
    skips = 0
    num_lines = 0
    for doc in texts:
        text = ""
        for line in doc.splitlines():
            num_lines += 1
            try:
                line = line.decode(encoding)
            except:
                skips += 1
                continue
            text += line
        new_texts.append(text)
    pct = round((skips * 1.0 / num_lines) * 100, 1)
    if verbose:
        print("skipped %s lines (%s%%) due to character decoding errors" % (skips, pct))
        if pct > 10:
            print("If this is too many, try a different encoding")
    return new_texts


def detect_encoding(texts, sample_size=32):
    if not CHARDET:
        raise ValueError(
            "charset-normalizer is missing - install with pip install charset-normalizer"
        )
    if isinstance(texts, str):
        texts = [texts]
    lst = [chardet.detect(doc)["encoding"] for doc in texts[:sample_size]]
    encoding = max(set(lst), key=lst.count)
    encoding = "utf-8" if encoding.lower() in ["ascii", "utf8", "utf-8"] else encoding
    return encoding


def read_text(filename):
    with open(filename, "rb") as f:
        text = f.read()
    encoding = detect_encoding([text])
    try:
        decoded_text = text.decode(encoding)
    except:
        U.vprint(
            "Decoding with %s failed 1st attempt - using %s with skips"
            % (encoding, encoding),
            verbose=verbose,
        )
        decoded_text = decode_by_line(text, encoding=encoding)
    return decoded_text.strip()


def sent_tokenize(text):
    """
    segment text into sentences
    """
    lang = detect_lang(text)
    sents = []
    if is_chinese(lang):
        for sent in re.findall("[^!?。\.\!\?]+[!?。\.\!\?]?", text, flags=re.U):
            sents.append(sent)
    else:
        for paragraph in segmenter.process(text):
            for sentence in paragraph:
                sents.append(" ".join([t.value for t in sentence]))
    return sents

Functions

def decode_by_line(texts, encoding='utf-8', verbose=1)

Decode text line by line and skip over errors.

Expand source code
def decode_by_line(texts, encoding="utf-8", verbose=1):
    """
    Decode text line by line and skip over errors.
    """
    if isinstance(texts, str):
        texts = [texts]
    new_texts = []
    skips = 0
    num_lines = 0
    for doc in texts:
        text = ""
        for line in doc.splitlines():
            num_lines += 1
            try:
                line = line.decode(encoding)
            except:
                skips += 1
                continue
            text += line
        new_texts.append(text)
    pct = round((skips * 1.0 / num_lines) * 100, 1)
    if verbose:
        print("skipped %s lines (%s%%) due to character decoding errors" % (skips, pct))
        if pct > 10:
            print("If this is too many, try a different encoding")
    return new_texts
def detect_encoding(texts, sample_size=32)
Expand source code
def detect_encoding(texts, sample_size=32):
    if not CHARDET:
        raise ValueError(
            "charset-normalizer is missing - install with pip install charset-normalizer"
        )
    if isinstance(texts, str):
        texts = [texts]
    lst = [chardet.detect(doc)["encoding"] for doc in texts[:sample_size]]
    encoding = max(set(lst), key=lst.count)
    encoding = "utf-8" if encoding.lower() in ["ascii", "utf8", "utf-8"] else encoding
    return encoding
def detect_lang(texts, sample_size=32)

detect language

Expand source code
def detect_lang(texts, sample_size=32):
    """
    detect language
    """
    if not LANGDETECT:
        raise ValueError("langdetect is missing - install with pip install langdetect")

    if isinstance(texts, str):
        texts = [texts]
    if not isinstance(texts, (list, np.ndarray)):
        raise ValueError("texts must be a list or NumPy array of strings")
    lst = []
    for doc in texts[:sample_size]:
        try:
            lst.append(langdetect.detect(doc))
        except:
            continue
    if len(lst) == 0:
        raise Exception(
            "could not detect language in random sample of %s docs." % (sample_size)
        )
    return max(set(lst), key=lst.count)
def extract_filenames(corpus_path, follow_links=False)
Expand source code
def extract_filenames(corpus_path, follow_links=False):
    if os.listdir(corpus_path) == []:
        raise ValueError("%s: path is empty" % corpus_path)
    for root, _, fnames in os.walk(corpus_path, followlinks=follow_links):
        for filename in fnames:
            try:
                yield os.path.join(root, filename)
            except Exception:
                continue
def is_chinese(lang)

include additional languages due to mistakes on short texts by langdetect

Expand source code
def is_chinese(lang):
    """
    include additional languages due to mistakes on short texts by langdetect
    """
    return lang is not None and lang.startswith("zh-") or lang in ["ja", "ko"]
def read_text(filename)
Expand source code
def read_text(filename):
    with open(filename, "rb") as f:
        text = f.read()
    encoding = detect_encoding([text])
    try:
        decoded_text = text.decode(encoding)
    except:
        U.vprint(
            "Decoding with %s failed 1st attempt - using %s with skips"
            % (encoding, encoding),
            verbose=verbose,
        )
        decoded_text = decode_by_line(text, encoding=encoding)
    return decoded_text.strip()
def sent_tokenize(text)

segment text into sentences

Expand source code
def sent_tokenize(text):
    """
    segment text into sentences
    """
    lang = detect_lang(text)
    sents = []
    if is_chinese(lang):
        for sent in re.findall("[^!?。\.\!\?]+[!?。\.\!\?]?", text, flags=re.U):
            sents.append(sent)
    else:
        for paragraph in segmenter.process(text):
            for sentence in paragraph:
                sents.append(" ".join([t.value for t in sentence]))
    return sents
def split_chinese(texts)
Expand source code
def split_chinese(texts):
    if not JIEBA:
        raise ValueError("jieba is missing - install with pip install jieba")
    if isinstance(texts, str):
        texts = [texts]

    split_texts = []
    for doc in texts:
        seg_list = jieba.cut(doc, cut_all=False)
        seg_list = list(seg_list)
        split_texts.append(seg_list)
    return [" ".join(tokens) for tokens in split_texts]