# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import time from abc import ABC, abstractmethod from typing import List from paddlex.utils import logging from .....utils.deps import class_requires_deps, is_dep_available from .....utils.subclass_register import AutoRegisterABCMetaClass if is_dep_available("langchain"): from langchain.docstore.document import Document from langchain.text_splitter import RecursiveCharacterTextSplitter if is_dep_available("langchain-community"): from langchain_community import vectorstores from langchain_community.vectorstores import FAISS @class_requires_deps("langchain", "langchain-community") class BaseRetriever(ABC, metaclass=AutoRegisterABCMetaClass): """Base Retriever""" __is_base = True VECTOR_STORE_PREFIX = "PADDLEX_VECTOR_STORE" def __init__(self): """Initializes an instance of base retriever.""" super().__init__() self.model_name = None self.embedding = None @abstractmethod def generate_vector_database(self): """ Declaration of an abstract method. Subclasses are expected to provide a concrete implementation of generate_vector_database. """ raise NotImplementedError( "The method `generate_vector_database` has not been implemented yet." ) @abstractmethod def similarity_retrieval(self): """ Declaration of an abstract method. Subclasses are expected to provide a concrete implementation of similarity_retrieval. """ raise NotImplementedError( "The method `similarity_retrieval` has not been implemented yet." ) def get_model_name(self) -> str: """ Get the model name used for generating vectors. Returns: str: The model name. """ return self.model_name def is_vector_store(self, s: str) -> bool: """ Check if the given string starts with the vector store prefix. Args: s (str): The input string to check. Returns: bool: True if the string starts with the vector store prefix, False otherwise. """ return s.startswith(self.VECTOR_STORE_PREFIX) def encode_vector_store(self, vector_store_bytes: bytes) -> str: """ Encode the vector store bytes into a base64 string prefixed with a specific prefix. Args: vector_store_bytes (bytes): The bytes to encode. Returns: str: The encoded string with the prefix. """ return self.VECTOR_STORE_PREFIX + base64.b64encode(vector_store_bytes).decode( "ascii" ) def decode_vector_store(self, vector_store_str: str) -> bytes: """ Decodes the vector store string by removing the prefix and decoding the base64 encoded string. Args: vector_store_str (str): The vector store string with a prefix. Returns: bytes: The decoded vector store data. """ return base64.b64decode(vector_store_str[len(self.VECTOR_STORE_PREFIX) :]) def generate_vector_database( self, text_list: List[str], block_size: int = 300, separators: List[str] = ["\t", "\n", "。", "\n\n", ""], ) -> "FAISS": """ Generates a vector database from a list of texts. Args: text_list (list[str]): A list of texts to generate the vector database from. block_size (int): The size of each chunk to split the text into. separators (list[str]): A list of separators to use when splitting the text. Returns: FAISS: The generated vector database. Raises: ValueError: If an unsupported API type is configured. """ text_splitter = RecursiveCharacterTextSplitter( chunk_size=block_size, chunk_overlap=20, separators=separators ) texts = text_splitter.split_text("\t".join(text_list)) all_splits = [Document(page_content=text) for text in texts] try: vectorstore = FAISS.from_documents( documents=all_splits, embedding=self.embedding ) except ValueError: vectorstore = None return vectorstore def encode_vector_store_to_bytes(self, vectorstore: "FAISS") -> str: """ Encode the vector store serialized to bytes. Args: vectorstore (FAISS): The vector store to be serialized and encoded. Returns: str: The encoded vector store. """ if vectorstore is None: vectorstore = self.VECTOR_STORE_PREFIX else: vectorstore = self.encode_vector_store(vectorstore.serialize_to_bytes()) return vectorstore def decode_vector_store_from_bytes(self, vectorstore: str) -> "FAISS": """ Decode a vector store from bytes according to the specified API type. Args: vectorstore (str): The serialized vector store string. Returns: FAISS: Deserialized vector store object. Raises: ValueError: If the retrieved vector store is not for PaddleX or if an unsupported API type is specified. """ if not self.is_vector_store(vectorstore): raise ValueError("The retrieved vectorstore is not for PaddleX.") vectorstore = self.decode_vector_store(vectorstore) if vectorstore == b"": logging.warning("The retrieved vectorstore is empty,will empty vector.") return None vector = vectorstores.FAISS.deserialize_from_bytes( vectorstore, embeddings=self.embedding, allow_dangerous_deserialization=True, ) return vector def similarity_retrieval( self, query_text_list: List[str], vectorstore: "FAISS", sleep_time: float = 0.5, topk: int = 2, min_characters: int = 3500, ) -> str: """ Retrieve similar contexts based on a list of query texts. Args: query_text_list (list[str]): A list of query texts to search for similar contexts. vectorstore (FAISS): The vector store where to perform the similarity search. sleep_time (float): The time to sleep between each query, in seconds. Default is 0.5. topk (int): The number of results to retrieve per query. Default is 2. min_characters (int): The minimum number of characters required for text processing, defaults to 3500. Returns: str: A concatenated string of all unique contexts found. """ all_C = "" if vectorstore is None: return all_C for query_text in query_text_list: QUESTION = query_text time.sleep(sleep_time) docs = vectorstore.similarity_search_with_relevance_scores(QUESTION, k=topk) context = [(document.page_content, score) for document, score in docs] context = sorted(context, key=lambda x: x[1]) for text, score in context[::-1]: if score >= -0.1: if len(all_C) + len(text) > min_characters: break all_C += text return all_C