Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from functools import lru_cache
from .settings import AppSettings


@lru_cache
def get_settings() -> AppSettings:
return AppSettings()
172 changes: 172 additions & 0 deletions CoScientist/papers_processing_refactoring/app/main_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta
import time
import threading

from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from marker.models import create_model_dict

from CoScientist.papers_processing_refactoring.app.config_loader import get_settings
from CoScientist.papers_processing_refactoring.etl import *
from CoScientist.papers_processing_refactoring.embeddings import *
from CoScientist.papers_processing_refactoring.scheduling.scheduler import IngestionScheduler, Schedule
from CoScientist.papers_processing_refactoring.sources.local import LocalSource
from CoScientist.papers_processing_refactoring.storage.state.state_db import SQLiteStateManager
from CoScientist.papers_processing_refactoring.storage.artifacts import *
from CoScientist.papers_processing_refactoring.storage.vector import *
from CoScientist.papers_processing_refactoring.definitions import CONFIG_PATH

MAX_WORKERS = 3

load_dotenv(CONFIG_PATH)
settings = get_settings()


def build_services(etl_settings):
embedding_model = create_embedding_model({
"type": etl_settings.embeddings.type,
"url": etl_settings.embeddings.api_url,
"model_name": etl_settings.embeddings.model_name,
"batch_size": etl_settings.embeddings.batch_size,
})
return {
"embedding_model": embedding_model
}


def build_vector_store(etl_settings):
if etl_settings.vectordb.backend == "chromadb":
return ChromaVectorStore(
etl_settings.vectordb.chroma.host,
etl_settings.vectordb.chroma.port,
etl_settings.vectordb.chroma.collection
)
else:
raise ValueError("Vector store configuration must be provided")


def build_artifacts_stores(etl_settings):
etl_art_store = S3ETLArtifactStore(
endpoint=etl_settings.s3.endpoint,
access_key=etl_settings.s3.access_key,
secret_key=etl_settings.s3.secret_key,
bucket=etl_settings.s3.etl_bucket
)
public_art_store = S3DomainArtifactStore(
endpoint=etl_settings.s3.endpoint,
access_key=etl_settings.s3.access_key,
secret_key=etl_settings.s3.secret_key,
bucket=etl_settings.s3.public_bucket # Delete after testing
)
return etl_art_store, public_art_store


def build_state_store(etl_settings):
if etl_settings.database.type == "sqlite":
return SQLiteStateManager(etl_settings.database.sqlite_path)
# elif etl_settings.database.type == "postgres":
# return PostgreSQLStateManager(etl_settings.database.postgres.dsn)
else:
raise ValueError("State store configuration must be provided")


def process_single_article(article, app_settings, shared_models, parse_lock):
print(f"[{article.name}] Thread started...")

state_manager = build_state_store(app_settings)

if state_manager.get_status(article.id, "publish") == "done":
return f"[{article.name}] Already processed. Skipped."

if any(elem["status"] == "running" for elem in state_manager.list_states(article.id)):
return f"[{article.name}] Processing is already running. Skipped."

local_source = LocalSource(settings.files.directory)
vector_store = build_vector_store(settings)
artifact_store, public_store = build_artifacts_stores(settings)
llm_model = ChatOpenAI(
model=settings.llm.llm_name,
base_url=settings.llm.llm_base_url,
api_key=settings.llm.llm_api_key, # noqa
temperature=0.1
)
embedding_model = build_services(settings)["embedding_model"]

pipeline = ETLPipeline(
steps=[
FetchStep(source=local_source),
ParseStep(shared_models=shared_models, parse_lock=parse_lock),
HtmlCleaningStep(),
ImageFilteringStep(),
ImageCaptioningStep(),
PaperSummarisatonStep(),
ChunkingStep(),
EmbeddingStep(),
PublishStep()
]
)

ctx = ETLContext(
article=article,
state_manager=state_manager,
artifact_store=artifact_store,
public_store=public_store,
vector_store=vector_store,
llm=llm_model,
embedding_model=embedding_model
)

start = time.perf_counter()

try:
pipeline.run(ctx)
end = time.perf_counter()
return f"[{article.id}] Success in {end - start:.2f}s"
except Exception as e:
end = time.perf_counter()
return f"[{article.id}] Failed: {str(e)} in {end - start:.2f}s"


def handle_articles_batch(articles, shared_models, parse_lock):
print(f"Scheduler found {len(articles)} articles. Starting parallel processing...")

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_article = {
executor.submit(process_single_article, art, settings, shared_models, parse_lock): art
for art in articles
}

for future in as_completed(future_to_article):
result_msg = future.result()
print(result_msg)


def main():
print("Starting Papers ETL Daemon...")

with build_state_store(settings) as state_manager:
print("Cleaning up hanging tasks...")
state_manager.reset_running_states()

shared_parser_models = create_model_dict()
parse_lock = threading.Lock()

local_source = LocalSource(settings.files.directory)

scheduler = IngestionScheduler(
on_batch=lambda batch: handle_articles_batch(batch, shared_parser_models, parse_lock)
)
scheduler.register(local_source, Schedule(timedelta(minutes=1)))

try:
while True:
scheduler.poll()
time.sleep(10)
except KeyboardInterrupt:
print("Shutting down daemon...")


if __name__ == "__main__":
main()

149 changes: 149 additions & 0 deletions CoScientist/papers_processing_refactoring/app/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from pathlib import Path

from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field

from CoScientist.papers_processing_refactoring.definitions import CONFIG_PATH


class LLMSettings(BaseSettings):

llm_base_url: str
llm_name: str
llm_api_key: str

model_config = SettingsConfigDict(
env_prefix="ETL_",
extra="ignore",
)


class EmbeddingSettings(BaseSettings):

type: str = Field(default="api")
api_url: str | None = None
model_name: str | None = None
batch_size: int = 16

model_config = SettingsConfigDict(
env_prefix="EMBEDDINGS_",
extra="ignore",
)


class RerankerSettings(BaseSettings):

type: str = "api"
api_url: str | None = None
model_name: str | None = None
batch_size: int = 16

model_config = SettingsConfigDict(
env_prefix="RERANKING_",
extra="ignore",
)


class ChromaSettings(BaseSettings):

host: str = "localhost"
port: int = 8000
collection: str = "default_collection"

model_config = SettingsConfigDict(
env_prefix="CHROMADB_",
extra="ignore",
)


class QdrantSettings(BaseSettings):
host: str = "localhost"
port: int = 6333
collection: str = "articles"

model_config = SettingsConfigDict(
env_prefix="QDRANT_",
extra="ignore",
)


class VectorDBSettings(BaseSettings):

backend: str = Field(default="chromadb", alias="VECTOR_DB")

chroma: ChromaSettings = Field(default_factory=ChromaSettings)
qdrant: QdrantSettings = Field(default_factory=QdrantSettings)

model_config = SettingsConfigDict(
extra="ignore",
)


class S3BucketSettings(BaseSettings):

bucket: str

model_config = SettingsConfigDict(extra="ignore")


class S3Settings(BaseSettings):

endpoint: str
access_key: str
secret_key: str
etl_bucket: str
public_bucket: str

model_config = SettingsConfigDict(
env_prefix="S3_",
extra="ignore",
)


class PostgresSettings(BaseSettings):

dsn: str

model_config = SettingsConfigDict(
env_prefix="POSTGRES_",
extra="ignore",
)


class DatabaseSettings(BaseSettings):

type: str = Field(default="sqlite", alias="DATABASE_TYPE")
sqlite_path: str = Field(default="./data/db.sqlite", alias="SQLITE_PATH")

postgres: PostgresSettings | None = None

model_config = SettingsConfigDict(
extra="ignore",
)


class FilesSettings(BaseSettings):
directory: Path

model_config = SettingsConfigDict(
env_prefix="FILES_",
extra="ignore",
)


class AppSettings(BaseSettings):

embeddings: EmbeddingSettings = Field(default_factory=EmbeddingSettings)
reranker: RerankerSettings = Field(default_factory=RerankerSettings)
vectordb: VectorDBSettings = Field(default_factory=VectorDBSettings)
s3: S3Settings = Field(default_factory=S3Settings)
database: DatabaseSettings = Field(default_factory=DatabaseSettings)
llm: LLMSettings = Field(default_factory=LLMSettings)
files: FilesSettings = Field(default_factory=FilesSettings)

model_config = SettingsConfigDict(
env_file=CONFIG_PATH,
env_file_encoding="utf-8",
extra="ignore",
case_sensitive=False
)
4 changes: 4 additions & 0 deletions CoScientist/papers_processing_refactoring/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from pathlib import Path

ROOT_DIR = Path(__file__).parent.parent.parent.absolute()
CONFIG_PATH = ROOT_DIR / 'config.env'
49 changes: 49 additions & 0 deletions CoScientist/papers_processing_refactoring/domain/entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from enum import Enum
from pathlib import Path
from typing import Any, Literal, Dict, Optional, Union, Mapping

from pydantic import BaseModel

# from CoScientist.papers.sources.base import ArticleSource


# TODO: maybe add separate classes for source types (as Enum) and source references
class Article(BaseModel):
id: str
source_type: Literal["local", "remote"]
source_ref: Union[str, Path]
name: str
domain: str = "default"
metadata: Optional[Dict[str, Any]] = None
# source: ArticleSource


class ChunkRole(str, Enum):
BODY = "body"
SUMMARY = "summary"
IMAGE_CAPTION = "image_caption"
TABLE = "table"


class Chunk(BaseModel):
id: str
article_id: str
domain: Optional[str] = None
modality: Literal["text", "image"]
content: str
metadata: Optional[Mapping[str, Any]] = None
role: str


class KnowledgeDomain(BaseModel):
name: str
description: str


class ImageInfo(BaseModel):
id: str
file_name: str
original_src: str | Any
is_kept: bool = True
caption: Optional[str] = None
final_s3_url: Optional[str] = None
Loading