Module ktrain.text.shallownlp
Expand source code
from .classifier import Classifier
from .ner import NER
from .searcher import *
from .utils import extract_filenames, read_text, sent_tokenize
__all__ = [
"Classifier",
"Searcher",
"search",
"find_chinese",
"find_arabic",
"find_russian",
"read_text",
"NER",
"sent_tokenize",
"extract_filenames",
"read_text",
]
Sub-modules
ktrain.text.shallownlp.classifier
ktrain.text.shallownlp.imports
ktrain.text.shallownlp.ner
ktrain.text.shallownlp.searcher
ktrain.text.shallownlp.utils
Functions
def extract_filenames(corpus_path, follow_links=False)
-
Expand source code
def extract_filenames(corpus_path, follow_links=False): if os.listdir(corpus_path) == []: raise ValueError("%s: path is empty" % corpus_path) for root, _, fnames in os.walk(corpus_path, followlinks=follow_links): for filename in fnames: try: yield os.path.join(root, filename) except Exception: continue
def find_arabic(s)
-
Expand source code
def find_arabic(s): return re.findall(r"[\u0600-\u06FF]+", s)
def find_chinese(s)
-
Expand source code
def find_chinese(s): return re.findall(r"[\u4e00-\u9fff]+", s)
def find_russian(s)
-
Expand source code
def find_russian(s): return find_cyrillic(s)
def read_text(filename)
-
Expand source code
def read_text(filename): with open(filename, "rb") as f: text = f.read() encoding = detect_encoding([text]) try: decoded_text = text.decode(encoding) except: U.vprint( "Decoding with %s failed 1st attempt - using %s with skips" % (encoding, encoding), verbose=verbose, ) decoded_text = decode_by_line(text, encoding=encoding) return decoded_text.strip()
def search(query, doc, case_sensitive=False, keys=[], progress=False)
-
Expand source code
def search(query, doc, case_sensitive=False, keys=[], progress=False): searcher = Searcher(query) return searcher.search( doc, case_sensitive=case_sensitive, keys=keys, progress=progress )
def sent_tokenize(text)
-
segment text into sentences
Expand source code
def sent_tokenize(text): """ segment text into sentences """ lang = detect_lang(text) sents = [] if is_chinese(lang): for sent in re.findall("[^!?。\.\!\?]+[!?。\.\!\?]?", text, flags=re.U): sents.append(sent) else: for paragraph in segmenter.process(text): for sentence in paragraph: sents.append(" ".join([t.value for t in sentence])) return sents
Classes
class Classifier (model=None)
-
instantiate a classifier with an optional previously-saved model
Expand source code
class Classifier: def __init__(self, model=None): """ instantiate a classifier with an optional previously-saved model """ self.model = None def create_model(self, ctype, texts, use_tfidf=False, **kwargs): """ ``` create a model Args: ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'} texts(list): list of texts kwargs(dict): additional parameters should have one of the following prefixes: vec__ : hyperparameters to CountVectorizer (e.g., vec__max_features=10000) tfidf__ : hyperparameters to TfidfTransformer clf__: hyperparameters to classifier (specific to ctype). If ctype='logreg', then an example is clf__solver='liblinear'. ``` """ if ctype == "nbsvm": if kwargs.get("vec__binary", False) is False: warnings.warn("nbsvm must use binary=True - changing automatically") if use_tfidf: warnings.warn("nbsvm must use use_tfidf=False = changing automatically") vec_kwargs = dict( (k.replace("vec__", ""), kwargs[k]) for k in kwargs if k.startswith("vec__") ) tfidf_kwargs = dict( (k.replace("tfidf__", ""), kwargs[k]) for k in kwargs if k.startswith("tfidf__") ) clf_kwargs = dict( (k.replace("clf__", ""), kwargs[k]) for k in kwargs if k.startswith("clf__") ) lang = U.detect_lang(texts) if U.is_chinese(lang) and not vec_kwargs.get("token_pattern", None): vec_kwargs["token_pattern"] = r"(?u)\b\w+\b" elif not kwargs.get("vec__token_pattern", None): vec_kwargs["token_pattern"] = r"\w+|[%s]" % string.punctuation if ctype == "nbsvm": clf = NBSVM(**clf_kwargs) elif ctype == "logreg": clf = LogisticRegression(**clf_kwargs) elif ctype == "sgdclassifier": clf = SGDClassifier(**clf_kwargs) else: raise ValueError("Unknown ctype: %s" % (ctype)) pipeline = [("vect", CountVectorizer(**vec_kwargs))] if use_tfidf: pipeline.append(("tfidf", TfidfTransformer(**tfidf_kwargs))) pipeline.append(("clf", clf)) self.model = Pipeline(pipeline) return @classmethod def load_texts_from_folder( cls, folder_path, subfolders=None, shuffle=True, encoding=None ): """ ``` load text files from folder Args: folder_path(str): path to folder containing documents The supplied folder should contain a subfolder for each category, which will be used as the class label subfolders(list): list of subfolders under folder_path to consider Example: If folder_path contains subfolders pos, neg, and unlabeled, then unlabeled folder can be ignored by setting subfolders=['pos', 'neg'] shuffle(bool): If True, list of texts will be shuffled encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ bunch = load_files(folder_path, categories=subfolders, shuffle=shuffle) texts = bunch.data labels = bunch.target label_names = bunch.target_names # print('target names:') # for idx, label_name in enumerate(bunch.target_names): # print('\t%s:%s' % (idx, label_name)) # decode based on supplied encoding if encoding is None: encoding = U.detect_encoding(texts) if encoding != "utf-8": print("detected encoding: %s" % (encoding)) try: texts = [text.decode(encoding) for text in texts] except: print( "Decoding with %s failed 1st attempt - using %s with skips" % (encoding, encoding) ) texts = U.decode_by_line(texts, encoding=encoding) return (texts, labels, label_names) @classmethod def load_texts_from_csv( cls, csv_filepath, text_column="text", label_column="label", sep=",", encoding=None, ): """ ``` load text files from csv file CSV should have at least two columns. Example: Text | Label I love this movie. | positive I hated this movie.| negative Args: csv_filepath(str): path to CSV file text_column(str): name of column containing the texts. default:'text' label_column(str): name of column containing the labels in string format default:'label' sep(str): character that separates columns in CSV. default:',' encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ if encoding is None: with open(csv_filepath, "rb") as f: encoding = U.detect_encoding([f.read()]) if encoding != "utf-8": print("detected encoding: %s (if wrong, set manually)" % (encoding)) import pandas as pd df = pd.read_csv(csv_filepath, encoding=encoding, sep=sep) texts = df[text_column].fillna("fillna").values labels = df[label_column].values le = LabelEncoder() le.fit(labels) labels = le.transform(labels) return (texts, labels, le.classes_) def fit(self, x_train, y_train, ctype="logreg"): """ ``` train a classifier Args: x_train(list or np.ndarray): training texts y_train(np.ndarray): training labels ctype(str): One of {'logreg', 'nbsvm', 'sgdclassifier'}. default:nbsvm ``` """ lang = U.detect_lang(x_train) if U.is_chinese(lang): x_train = U.split_chinese(x_train) if self.model is None: self.create_model(ctype, x_train) self.model.fit(x_train, y_train) return self def predict(self, x_test, return_proba=False): """ ``` make predictions on text data Args: x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text ``` """ if return_proba and not hasattr(self.model["clf"], "predict_proba"): raise ValueError( "%s does not support predict_proba" % (type(self.model["clf"]).__name__) ) if isinstance(x_test, str): x_test = [x_test] lang = U.detect_lang(x_test) if U.is_chinese(lang): x_test = U.split_chinese(x_test) if self.model is None: raise ValueError("model is None - call fit or load to set the model") if return_proba: predicted = self.model.predict_proba(x_test) else: predicted = self.model.predict(x_test) if len(predicted) == 1: predicted = predicted[0] return predicted def predict_proba(self, x_test): """ predict_proba """ return self.predict(x_test, return_proba=True) def evaluate(self, x_test, y_test): """ ``` evaluate Args: x_test(list or np.ndarray): training texts y_test(np.ndarray): training labels ``` """ predicted = self.predict(x_test) return np.mean(predicted == y_test) def save(self, filename): """ save model """ dump(self.model, filename) def load(self, filename): """ load model """ self.model = load(filename) def grid_search(self, params, x_train, y_train, n_jobs=-1): """ ``` Performs grid search to find optimal set of hyperparameters Args: params (dict): A dictionary defining the space of the search. Example for finding optimal value of alpha in NBSVM: parameters = { #'clf__C': (1e0, 1e-1, 1e-2), 'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0), #'clf__fit_intercept': (True, False), #'clf__beta' : (0.1, 0.25, 0.5, 0.9) } n_jobs(int): number of jobs to run in parallel. default:-1 (use all processors) ``` """ gs_clf = GridSearchCV(self.model, params, n_jobs=n_jobs) gs_clf = gs_clf.fit(x_train, y_train) # gs_clf.best_score_ for param_name in sorted(params.keys()): print("%s: %r" % (param_name, gs_clf.best_params_[param_name])) return
Static methods
def load_texts_from_csv(csv_filepath, text_column='text', label_column='label', sep=',', encoding=None)
-
load text files from csv file CSV should have at least two columns. Example: Text | Label I love this movie. | positive I hated this movie.| negative Args: csv_filepath(str): path to CSV file text_column(str): name of column containing the texts. default:'text' label_column(str): name of column containing the labels in string format default:'label' sep(str): character that separates columns in CSV. default:',' encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names)
Expand source code
@classmethod def load_texts_from_csv( cls, csv_filepath, text_column="text", label_column="label", sep=",", encoding=None, ): """ ``` load text files from csv file CSV should have at least two columns. Example: Text | Label I love this movie. | positive I hated this movie.| negative Args: csv_filepath(str): path to CSV file text_column(str): name of column containing the texts. default:'text' label_column(str): name of column containing the labels in string format default:'label' sep(str): character that separates columns in CSV. default:',' encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ if encoding is None: with open(csv_filepath, "rb") as f: encoding = U.detect_encoding([f.read()]) if encoding != "utf-8": print("detected encoding: %s (if wrong, set manually)" % (encoding)) import pandas as pd df = pd.read_csv(csv_filepath, encoding=encoding, sep=sep) texts = df[text_column].fillna("fillna").values labels = df[label_column].values le = LabelEncoder() le.fit(labels) labels = le.transform(labels) return (texts, labels, le.classes_)
def load_texts_from_folder(folder_path, subfolders=None, shuffle=True, encoding=None)
-
load text files from folder Args: folder_path(str): path to folder containing documents The supplied folder should contain a subfolder for each category, which will be used as the class label subfolders(list): list of subfolders under folder_path to consider Example: If folder_path contains subfolders pos, neg, and unlabeled, then unlabeled folder can be ignored by setting subfolders=['pos', 'neg'] shuffle(bool): If True, list of texts will be shuffled encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names)
Expand source code
@classmethod def load_texts_from_folder( cls, folder_path, subfolders=None, shuffle=True, encoding=None ): """ ``` load text files from folder Args: folder_path(str): path to folder containing documents The supplied folder should contain a subfolder for each category, which will be used as the class label subfolders(list): list of subfolders under folder_path to consider Example: If folder_path contains subfolders pos, neg, and unlabeled, then unlabeled folder can be ignored by setting subfolders=['pos', 'neg'] shuffle(bool): If True, list of texts will be shuffled encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ bunch = load_files(folder_path, categories=subfolders, shuffle=shuffle) texts = bunch.data labels = bunch.target label_names = bunch.target_names # print('target names:') # for idx, label_name in enumerate(bunch.target_names): # print('\t%s:%s' % (idx, label_name)) # decode based on supplied encoding if encoding is None: encoding = U.detect_encoding(texts) if encoding != "utf-8": print("detected encoding: %s" % (encoding)) try: texts = [text.decode(encoding) for text in texts] except: print( "Decoding with %s failed 1st attempt - using %s with skips" % (encoding, encoding) ) texts = U.decode_by_line(texts, encoding=encoding) return (texts, labels, label_names)
Methods
def create_model(self, ctype, texts, use_tfidf=False, **kwargs)
-
create a model Args: ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'} texts(list): list of texts kwargs(dict): additional parameters should have one of the following prefixes: vec__ : hyperparameters to CountVectorizer (e.g., vec__max_features=10000) tfidf__ : hyperparameters to TfidfTransformer clf__: hyperparameters to classifier (specific to ctype). If ctype='logreg', then an example is clf__solver='liblinear'.
Expand source code
def create_model(self, ctype, texts, use_tfidf=False, **kwargs): """ ``` create a model Args: ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'} texts(list): list of texts kwargs(dict): additional parameters should have one of the following prefixes: vec__ : hyperparameters to CountVectorizer (e.g., vec__max_features=10000) tfidf__ : hyperparameters to TfidfTransformer clf__: hyperparameters to classifier (specific to ctype). If ctype='logreg', then an example is clf__solver='liblinear'. ``` """ if ctype == "nbsvm": if kwargs.get("vec__binary", False) is False: warnings.warn("nbsvm must use binary=True - changing automatically") if use_tfidf: warnings.warn("nbsvm must use use_tfidf=False = changing automatically") vec_kwargs = dict( (k.replace("vec__", ""), kwargs[k]) for k in kwargs if k.startswith("vec__") ) tfidf_kwargs = dict( (k.replace("tfidf__", ""), kwargs[k]) for k in kwargs if k.startswith("tfidf__") ) clf_kwargs = dict( (k.replace("clf__", ""), kwargs[k]) for k in kwargs if k.startswith("clf__") ) lang = U.detect_lang(texts) if U.is_chinese(lang) and not vec_kwargs.get("token_pattern", None): vec_kwargs["token_pattern"] = r"(?u)\b\w+\b" elif not kwargs.get("vec__token_pattern", None): vec_kwargs["token_pattern"] = r"\w+|[%s]" % string.punctuation if ctype == "nbsvm": clf = NBSVM(**clf_kwargs) elif ctype == "logreg": clf = LogisticRegression(**clf_kwargs) elif ctype == "sgdclassifier": clf = SGDClassifier(**clf_kwargs) else: raise ValueError("Unknown ctype: %s" % (ctype)) pipeline = [("vect", CountVectorizer(**vec_kwargs))] if use_tfidf: pipeline.append(("tfidf", TfidfTransformer(**tfidf_kwargs))) pipeline.append(("clf", clf)) self.model = Pipeline(pipeline) return
def evaluate(self, x_test, y_test)
-
evaluate Args: x_test(list or np.ndarray): training texts y_test(np.ndarray): training labels
Expand source code
def evaluate(self, x_test, y_test): """ ``` evaluate Args: x_test(list or np.ndarray): training texts y_test(np.ndarray): training labels ``` """ predicted = self.predict(x_test) return np.mean(predicted == y_test)
def fit(self, x_train, y_train, ctype='logreg')
-
train a classifier Args: x_train(list or np.ndarray): training texts y_train(np.ndarray): training labels ctype(str): One of {'logreg', 'nbsvm', 'sgdclassifier'}. default:nbsvm
Expand source code
def fit(self, x_train, y_train, ctype="logreg"): """ ``` train a classifier Args: x_train(list or np.ndarray): training texts y_train(np.ndarray): training labels ctype(str): One of {'logreg', 'nbsvm', 'sgdclassifier'}. default:nbsvm ``` """ lang = U.detect_lang(x_train) if U.is_chinese(lang): x_train = U.split_chinese(x_train) if self.model is None: self.create_model(ctype, x_train) self.model.fit(x_train, y_train) return self
def grid_search(self, params, x_train, y_train, n_jobs=-1)
-
Performs grid search to find optimal set of hyperparameters Args: params (dict): A dictionary defining the space of the search. Example for finding optimal value of alpha in NBSVM: parameters = { #'clf__C': (1e0, 1e-1, 1e-2), 'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0), #'clf__fit_intercept': (True, False), #'clf__beta' : (0.1, 0.25, 0.5, 0.9) } n_jobs(int): number of jobs to run in parallel. default:-1 (use all processors)
Expand source code
def grid_search(self, params, x_train, y_train, n_jobs=-1): """ ``` Performs grid search to find optimal set of hyperparameters Args: params (dict): A dictionary defining the space of the search. Example for finding optimal value of alpha in NBSVM: parameters = { #'clf__C': (1e0, 1e-1, 1e-2), 'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0), #'clf__fit_intercept': (True, False), #'clf__beta' : (0.1, 0.25, 0.5, 0.9) } n_jobs(int): number of jobs to run in parallel. default:-1 (use all processors) ``` """ gs_clf = GridSearchCV(self.model, params, n_jobs=n_jobs) gs_clf = gs_clf.fit(x_train, y_train) # gs_clf.best_score_ for param_name in sorted(params.keys()): print("%s: %r" % (param_name, gs_clf.best_params_[param_name])) return
def load(self, filename)
-
load model
Expand source code
def load(self, filename): """ load model """ self.model = load(filename)
def predict(self, x_test, return_proba=False)
-
make predictions on text data Args: x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text
Expand source code
def predict(self, x_test, return_proba=False): """ ``` make predictions on text data Args: x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text ``` """ if return_proba and not hasattr(self.model["clf"], "predict_proba"): raise ValueError( "%s does not support predict_proba" % (type(self.model["clf"]).__name__) ) if isinstance(x_test, str): x_test = [x_test] lang = U.detect_lang(x_test) if U.is_chinese(lang): x_test = U.split_chinese(x_test) if self.model is None: raise ValueError("model is None - call fit or load to set the model") if return_proba: predicted = self.model.predict_proba(x_test) else: predicted = self.model.predict(x_test) if len(predicted) == 1: predicted = predicted[0] return predicted
def predict_proba(self, x_test)
-
predict_proba
Expand source code
def predict_proba(self, x_test): """ predict_proba """ return self.predict(x_test, return_proba=True)
def save(self, filename)
-
save model
Expand source code
def save(self, filename): """ save model """ dump(self.model, filename)
class NER (lang='en', predictor_path=None)
-
pretrained NER. Only English and Chinese are currenty supported. Args: lang(str): Currently, one of {'en', 'zh', 'ru'}: en=English , zh=Chinese, or ru=Russian
Expand source code
class NER: def __init__(self, lang="en", predictor_path=None): """ ``` pretrained NER. Only English and Chinese are currenty supported. Args: lang(str): Currently, one of {'en', 'zh', 'ru'}: en=English , zh=Chinese, or ru=Russian ``` """ if lang is None: raise ValueError( 'lang is required (e.g., "en" for English, "zh" for Chinese, "ru" for Russian, etc.' ) if predictor_path is None and lang not in ["en", "zh", "ru"]: raise ValueError( "Unsupported language: if predictor_path is None, then lang must be " + "'en' for English, 'zh' for Chinese, or 'ru' for Chinese" ) self.lang = lang if os.environ.get("DISABLE_V2_BEHAVIOR", None) != "1": warnings.warn( "Please add os.environ['DISABLE_V2_BEHAVIOR'] = '1' at top of your script or notebook" ) msg = ( "\nNER in ktrain uses the CRF module from keras_contrib, which is not yet\n" + "fully compatible with TensorFlow 2. To use NER, you must add the following to the top of your\n" + "script or notebook BEFORE you import ktrain (after restarting runtime):\n\n" + "import os\n" + "os.environ['DISABLE_V2_BEHAVIOR'] = '1'\n" ) print(msg) return else: import tensorflow.compat.v1 as tf tf.disable_v2_behavior() if predictor_path is None and self.lang == "zh": dirpath = os.path.dirname(os.path.abspath(__file__)) fpath = os.path.join(dirpath, "ner_models/ner_chinese") elif predictor_path is None and self.lang == "ru": dirpath = os.path.dirname(os.path.abspath(__file__)) fpath = os.path.join(dirpath, "ner_models/ner_russian") elif predictor_path is None and self.lang == "en": dirpath = os.path.dirname(os.path.abspath(__file__)) fpath = os.path.join(dirpath, "ner_models/ner_english") elif predictor_path is None: raise ValueError( "Unsupported language: if predictor_path is None, then lang must be " + "'en' for English, 'zh' for Chinese, or 'ru' for Chinese" ) else: if not os.path.isfile(predictor_path) or not os.path.isfile( predictor_path + ".preproc" ): raise ValueError( "could not find a valid predictor model " + "%s or valid Preprocessor %s at specified path" % (predictor_path, predictor_path + ".preproc") ) fpath = predictor_path try: import io from contextlib import redirect_stdout f = io.StringIO() with redirect_stdout(f): import ktrain except: raise ValueError( "ktrain could not be imported. Install with: pip install ktrain" ) self.predictor = ktrain.load_predictor(fpath) def predict(self, texts, merge_tokens=True, batch_size=32): """ ``` Extract named entities from supplied text Args: texts (list of str or str): list of texts to annotate merge_tokens(bool): If True, tokens will be merged together by the entity to which they are associated: ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER') batch_size(int): Batch size to use for predictions (default:32) ``` """ if isinstance(texts, str): texts = [texts] self.predictor.batch_size = batch_size texts = [t.strip() for t in texts] results = self.predictor.predict(texts, merge_tokens=merge_tokens) if len(results) == 1: results = results[0] return results # 2020-04-30: moved to text.ner.predictor # def merge_tokens(self, annotated_sentence): # if self.lang.startswith('zh'): # sep = '' # else: # sep = ' ' # current_token = "" # current_tag = "" # entities = [] # for tup in annotated_sentence: # token = tup[0] # entity = tup[1] # tag = entity.split('-')[1] if '-' in entity else None # prefix = entity.split('-')[0] if '-' in entity else None # # not within entity # if tag is None and not current_token: # continue # # beginning of entity # #elif tag and prefix=='B': # elif tag and (prefix=='B' or prefix=='I' and not current_token): # if current_token: # consecutive entities # entities.append((current_token, current_tag)) # current_token = "" # current_tag = None # current_token = token # current_tag = tag # # end of entity # elif tag is None and current_token: # entities.append((current_token, current_tag)) # current_token = "" # current_tag = None # continue # # within entity # elif tag and current_token: # prefix I # current_token = current_token + sep + token # current_tag = tag # return entities
Methods
def predict(self, texts, merge_tokens=True, batch_size=32)
-
Extract named entities from supplied text Args: texts (list of str or str): list of texts to annotate merge_tokens(bool): If True, tokens will be merged together by the entity to which they are associated: ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER') batch_size(int): Batch size to use for predictions (default:32)
Expand source code
def predict(self, texts, merge_tokens=True, batch_size=32): """ ``` Extract named entities from supplied text Args: texts (list of str or str): list of texts to annotate merge_tokens(bool): If True, tokens will be merged together by the entity to which they are associated: ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER') batch_size(int): Batch size to use for predictions (default:32) ``` """ if isinstance(texts, str): texts = [texts] self.predictor.batch_size = batch_size texts = [t.strip() for t in texts] results = self.predictor.predict(texts, merge_tokens=merge_tokens) if len(results) == 1: results = results[0] return results
class Searcher (queries, lang=None)
-
Search for keywords in text documents
Args: queries(list of str): list of chinese text queries lang(str): language of queries. default:None --> auto-detected
Expand source code
class Searcher: """ Search for keywords in text documents """ def __init__(self, queries, lang=None): """ ``` Args: queries(list of str): list of chinese text queries lang(str): language of queries. default:None --> auto-detected ``` """ self.queries = queries if isinstance(self.queries, str): self.queries = [self.queries] self.lang = lang if self.lang is None: self.lang = U.detect_lang(queries) # print("lang:%s" %(self.lang)) def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True): """ ``` executes self.queries on supplied list of documents Args: docs(list of str): list of chinese texts case_sensitive(bool): If True, case sensitive search keys(list): list keys for supplied docs (e.g., file paths). default: key is index in range(len(docs)) min_matches(int): results must have at least these many word matches progress(bool): whether or not to show progress bar Returns: list of tuples of results of the form: (key, query, no. of matches) For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match ``` """ if isinstance(docs, str): docs = [docs] if keys and len(keys) != len(docs): raise ValueError("lengths of keys and docs must be the same") results = [] l = len(docs) for idx, text in enumerate(docs): for q in self.queries: if U.is_chinese(self.lang): r = self._search_chinese( q, [text], min_matches=min_matches, parse=1, progress=False ) elif self.lang == "ar": r = self._search( q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=True, ) else: r = self._search( q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=False, ) if not r: continue r = r[0] k = idx if keys: k = keys[idx] num_matches = len(set(r[2])) if U.is_chinese(self.lang) else len(r[2]) results.append((k, q, num_matches)) if progress: printProgressBar( idx + 1, l, prefix="progress: ", suffix="complete", length=50 ) return results def _search( self, query, docs, case_sensitive=False, substrings_on=False, min_matches=1, progress=True, ): """ ``` search documents for query string. Args: query(str or list): the word or phrase to search (or list of them) if list is provided, each element is combined using OR docs (list of str): list of text documents case_sensitive(bool): If True, case sensitive search substrings_on(bool): whether to use "\b" in regex. default:True If True, will find substrings returns: list or tuple: Returns list of results if len(docs) > 1. Otherwise, returns tuple of results ``` """ if not isinstance(query, (list, tuple, str)): raise ValueError("query must be str or list of str") if isinstance(query, str): query = [query] if not isinstance(docs, (list, np.ndarray)): raise ValueError("docs must be list of str") flag = 0 if not case_sensitive: flag = re.I qlist = [] for q in query: qlist.append("\s+".join(q.split())) original_query = query query = "|".join(qlist) bound = r"\b" if substrings_on: bound = "" pattern_str = r"%s(?:%s)%s" % (bound, query, bound) pattern = re.compile(pattern_str, flag) results = [] l = len(docs) for idx, text in enumerate(docs): matches = pattern.findall(text) if matches and len(matches) >= min_matches: results.append((idx, text, matches)) if progress: printProgressBar( idx + 1, l, prefix="progress: ", suffix="complete", length=50 ) return results def _search_chinese( self, query, docs, substrings_on=True, parse=1, min_matches=1, progress=False ): """ convenience method to search chinese text """ original_query = query if not isinstance(query, str): raise ValueError("query must be str") if parse > 0: q = U.split_chinese(query)[0] num_words = len(q.split()) query = build_ngrams(q, n=parse) query = ["".join(q) for q in query] return self._search(query, docs, substrings_on=substrings_on, progress=progress)
Methods
def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True)
-
executes self.queries on supplied list of documents Args: docs(list of str): list of chinese texts case_sensitive(bool): If True, case sensitive search keys(list): list keys for supplied docs (e.g., file paths). default: key is index in range(len(docs)) min_matches(int): results must have at least these many word matches progress(bool): whether or not to show progress bar Returns: list of tuples of results of the form: (key, query, no. of matches) For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match
Expand source code
def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True): """ ``` executes self.queries on supplied list of documents Args: docs(list of str): list of chinese texts case_sensitive(bool): If True, case sensitive search keys(list): list keys for supplied docs (e.g., file paths). default: key is index in range(len(docs)) min_matches(int): results must have at least these many word matches progress(bool): whether or not to show progress bar Returns: list of tuples of results of the form: (key, query, no. of matches) For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match ``` """ if isinstance(docs, str): docs = [docs] if keys and len(keys) != len(docs): raise ValueError("lengths of keys and docs must be the same") results = [] l = len(docs) for idx, text in enumerate(docs): for q in self.queries: if U.is_chinese(self.lang): r = self._search_chinese( q, [text], min_matches=min_matches, parse=1, progress=False ) elif self.lang == "ar": r = self._search( q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=True, ) else: r = self._search( q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=False, ) if not r: continue r = r[0] k = idx if keys: k = keys[idx] num_matches = len(set(r[2])) if U.is_chinese(self.lang) else len(r[2]) results.append((k, q, num_matches)) if progress: printProgressBar( idx + 1, l, prefix="progress: ", suffix="complete", length=50 ) return results