from __future__ import annotations from unittest.mock import AsyncMock import pytest from finrep_algo_agent.config import Settings from finrep_algo_agent.rag.ingestion import chunk_text, extract_text_from_upload from finrep_algo_agent.rag.vectorstore import InMemoryRagStore from finrep_algo_agent.schemas.rag import RagDocumentIn from finrep_algo_agent.skills.rag_retrieve import RagService def test_extract_plain_text_utf8() -> None: ex = extract_text_from_upload(filename="a.txt", data="融资说明\n第二行".encode()) assert "融资说明" in ex.text assert not ex.warning def test_chunk_text_splits_and_non_empty() -> None: long = "第一段。\n\n" + "字" * 500 + "\n\n尾段" chunks = chunk_text(long, chunk_size=120, overlap=20) assert len(chunks) >= 2 assert all(c for c in chunks) @pytest.mark.asyncio async def test_rag_service_ingest_retrieve() -> None: settings = Settings( rag_chunk_size=200, rag_chunk_overlap=40, rag_default_top_k=3, rag_embedding_batch_size=8, ) store = InMemoryRagStore() async def fake_embeddings(texts: list[str]) -> list[list[float]]: return [[float(i % 5), float(len(t) % 3), 0.0, 1.0] for i, t in enumerate(texts)] async def fake_embedding(q: str) -> list[float]: return [1.0, 0.0, 0.0, 0.0] mock_llm = AsyncMock() mock_llm.embeddings = AsyncMock(side_effect=fake_embeddings) mock_llm.embedding = AsyncMock(side_effect=fake_embedding) svc = RagService(settings=settings, llm=mock_llm, store=store) await svc.ingest( "t1", [ RagDocumentIn( doc_id="d1", title="测试", text="融资主体基本情况说明。" * 30, source_label="上传材料.pdf", ) ], replace=True, ) out = await svc.retrieve("t1", "融资 主体", top_k=2, min_score=None) assert out.hits assert "RAG片段" in out.formatted_context or out.hits[0].text @pytest.mark.asyncio async def test_rag_delete_index() -> None: settings = Settings(rag_chunk_size=500, rag_chunk_overlap=0) store = InMemoryRagStore() mock_llm = AsyncMock() mock_llm.embeddings = AsyncMock(return_value=[[0.0, 1.0]]) mock_llm.embedding = AsyncMock(return_value=[0.0, 1.0]) svc = RagService(settings=settings, llm=mock_llm, store=store) await svc.ingest("tx", [RagDocumentIn(doc_id="a", text="短文本")], replace=True) assert store.list_task_chunks("tx") assert svc.delete_index("tx") assert not store.list_task_chunks("tx")