Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import logging
from typing import cast

from nucliadb.common.datamanagers.conversations import KB_CONVERSATION_SPLITS_METADATA
from nucliadb.common.maindb.driver import Transaction
from nucliadb.common.maindb.pg import PGTransaction
from nucliadb.ingest.fields.conversation import (
CONVERSATION_SPLITS_METADATA,
Conversation,
)
from nucliadb.ingest.orm.knowledgebox import KnowledgeBox as KnowledgeBoxORM
Expand Down Expand Up @@ -82,7 +82,7 @@ async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
splits_metadata = await build_splits_metadata(
txn2, context.blob_storage, kbid, rid, field_id
)
splits_metadata_key = CONVERSATION_SPLITS_METADATA.format(
splits_metadata_key = KB_CONVERSATION_SPLITS_METADATA.format(
kbid=kbid, uuid=rid, type="c", field=field_id
)
await txn2.set(splits_metadata_key, splits_metadata.SerializeToString())
Expand Down
2 changes: 2 additions & 0 deletions nucliadb/src/nucliadb/common/datamanagers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from . import (
atomic,
cluster,
conversations,
exceptions,
fields,
graph_vectorsets,
Expand All @@ -48,6 +49,7 @@
__all__ = (
"atomic",
"cluster",
"conversations",
"exceptions",
"fields",
"graph_vectorsets",
Expand Down
132 changes: 132 additions & 0 deletions nucliadb/src/nucliadb/common/datamanagers/conversations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at info@nuclia.com.
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from nucliadb.common.maindb.driver import Transaction
from nucliadb_protos.resources_pb2 import Conversation as PBConversation
from nucliadb_protos.resources_pb2 import FieldConversation, SplitsMetadata

KB_CONVERSATION_PAGE = "/kbs/{kbid}/r/{uuid}/f/{type}/{field}/{page}"
KB_CONVERSATION_SPLITS_METADATA = "/kbs/{kbid}/r/{uuid}/f/{type}/{field}/splits_metadata"
KB_CONVERSATION_METADATA = "/kbs/{kbid}/r/{uuid}/f/{type}/{field}"


async def get_page(
txn: Transaction,
*,
kbid: str,
rid: str,
field_type: str,
field_id: str,
page: int,
) -> PBConversation | None:
if page <= 0:
raise ValueError("Conversation pages start at index 1")
key = KB_CONVERSATION_PAGE.format(kbid=kbid, uuid=rid, type=field_type, field=field_id, page=page)
payload = await txn.get(key)
if payload is None:
return None
pb = PBConversation()
pb.ParseFromString(payload)
return pb


async def set_page(
txn: Transaction,
*,
kbid: str,
rid: str,
field_type: str,
field_id: str,
page: int,
value: PBConversation,
) -> None:
key = KB_CONVERSATION_PAGE.format(kbid=kbid, uuid=rid, type=field_type, field=field_id, page=page)
await txn.set(key, value.SerializeToString())


async def get_metadata(
txn: Transaction,
*,
kbid: str,
rid: str,
field_type: str,
field_id: str,
) -> FieldConversation | None:
key = KB_CONVERSATION_METADATA.format(kbid=kbid, uuid=rid, type=field_type, field=field_id)
payload = await txn.get(key)
if payload is None:
return None
pb = FieldConversation()
pb.ParseFromString(payload)
return pb


async def set_metadata(
txn: Transaction,
*,
kbid: str,
rid: str,
field_type: str,
field_id: str,
metadata: FieldConversation,
) -> None:
key = KB_CONVERSATION_METADATA.format(kbid=kbid, uuid=rid, type=field_type, field=field_id)
await txn.set(key, metadata.SerializeToString())


async def delete_field(
txn: Transaction,
*,
kbid: str,
rid: str,
field_type: str,
field_id: str,
) -> None:
base_key = KB_CONVERSATION_METADATA.format(kbid=kbid, uuid=rid, type=field_type, field=field_id)
await txn.delete_by_prefix(base_key)


async def get_splits_metadata(
txn: Transaction,
*,
kbid: str,
rid: str,
field_type: str,
field_id: str,
) -> SplitsMetadata | None:
key = KB_CONVERSATION_SPLITS_METADATA.format(kbid=kbid, uuid=rid, type=field_type, field=field_id)
payload = await txn.get(key)
if payload is None:
return None
pb = SplitsMetadata()
pb.ParseFromString(payload)
return pb


async def set_splits_metadata(
txn: Transaction,
*,
kbid: str,
rid: str,
field_type: str,
field_id: str,
splits_metadata: SplitsMetadata,
) -> None:
key = KB_CONVERSATION_SPLITS_METADATA.format(kbid=kbid, uuid=rid, type=field_type, field=field_id)
await txn.set(key, splits_metadata.SerializeToString())
6 changes: 6 additions & 0 deletions nucliadb/src/nucliadb/common/datamanagers/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ async def slug_exists(txn: Transaction, *, kbid: str, slug: str) -> bool:
return encoded_slug not in (None, b"")


async def set_slug(txn: Transaction, *, kbid: str, rid: str, slug: str) -> None:
"""Write the slug → resource-uuid mapping for a resource."""
key = KB_RESOURCE_SLUG.format(kbid=kbid, slug=slug)
await txn.set(key, rid.encode())


async def modify_slug(txn: Transaction, *, kbid: str, rid: str, new_slug: str) -> str:
basic = await get_basic(txn, kbid=kbid, rid=rid)
if basic is None:
Expand Down
100 changes: 53 additions & 47 deletions nucliadb/src/nucliadb/ingest/fields/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from collections import defaultdict
from typing import Any, Iterable

from nucliadb.common import datamanagers
from nucliadb.ingest import logger
from nucliadb.ingest.fields.base import Field
from nucliadb.ingest.fields.exceptions import FieldAuthorNotFound
Expand All @@ -32,10 +33,6 @@

PAGE_SIZE = 200

CONVERSATION_PAGE_VALUE = "/kbs/{kbid}/r/{uuid}/f/{type}/{field}/{page}"
CONVERSATION_SPLITS_METADATA = "/kbs/{kbid}/r/{uuid}/f/{type}/{field}/splits_metadata"
CONVERSATION_METADATA = "/kbs/{kbid}/r/{uuid}/f/{type}/{field}"


class PageNotFound(Exception):
pass
Expand All @@ -56,10 +53,14 @@ def __init__(self, id: str, resource: Any):
self.metadata = None

async def delete_value(self):
await self.resource.txn.delete_by_prefix(
CONVERSATION_METADATA.format(kbid=self.kbid, uuid=self.rid, type=self.type, field=self.id)
await datamanagers.conversations.delete_field(
self.resource.txn,
kbid=self.kbid,
rid=self.rid,
field_type=self.type,
field_id=self.id,
)
self._split_metadata = None
self._splits_metadata = None
self.metadata = None
self.value.clear()

Expand Down Expand Up @@ -175,15 +176,15 @@ async def get_full_conversation(self) -> PBConversation:

async def get_metadata(self) -> FieldConversation:
if self.metadata is None:
payload = await self.resource.txn.get(
CONVERSATION_METADATA.format(
kbid=self.kbid, uuid=self.rid, type=self.type, field=self.id
)
self.metadata = await datamanagers.conversations.get_metadata(
self.resource.txn,
kbid=self.kbid,
rid=self.rid,
field_type=self.type,
field_id=self.id,
)
self.metadata = FieldConversation()
if payload:
self.metadata.ParseFromString(payload)
else:
if self.metadata is None:
self.metadata = FieldConversation()
self.metadata.size = PAGE_SIZE
self.metadata.pages = 0
self.metadata.total = 0
Expand All @@ -192,67 +193,72 @@ async def get_metadata(self) -> FieldConversation:

async def db_get_value(self, page: int = 1):
if page == 0:
raise ValueError(f"Conversation pages start at index 1")
raise ValueError("Conversation pages start at index 1")

if self.value.get(page) is None:
field_key = CONVERSATION_PAGE_VALUE.format(
pb = await datamanagers.conversations.get_page(
self.resource.txn,
kbid=self.kbid,
uuid=self.rid,
type=self.type,
field=self.id,
rid=self.rid,
field_type=self.type,
field_id=self.id,
page=page,
)
payload = await self.resource.txn.get(field_key)
if payload is None:
if pb is None:
raise PageNotFound()
self.value[page] = PBConversation()
self.value[page].ParseFromString(payload)
self.value[page] = pb
return self.value[page]

async def db_set_value(self, payload: PBConversation, page: int = 0):
field_key = CONVERSATION_PAGE_VALUE.format(
kbid=self.kbid, uuid=self.rid, type=self.type, field=self.id, page=page
)
await self.resource.txn.set(
field_key,
payload.SerializeToString(),
await datamanagers.conversations.set_page(
self.resource.txn,
kbid=self.kbid,
rid=self.rid,
field_type=self.type,
field_id=self.id,
page=page,
value=payload,
)
self.value[page] = payload
self.resource.modified = True

async def db_set_metadata(self, payload: FieldConversation):
await self.resource.txn.set(
CONVERSATION_METADATA.format(kbid=self.kbid, uuid=self.rid, type=self.type, field=self.id),
payload.SerializeToString(),
await datamanagers.conversations.set_metadata(
self.resource.txn,
kbid=self.kbid,
rid=self.rid,
field_type=self.type,
field_id=self.id,
metadata=payload,
)
self.metadata = payload
self.resource.modified = True
self._created = False

async def get_splits_metadata(self) -> SplitsMetadata:
if self._splits_metadata is None:
field_key = CONVERSATION_SPLITS_METADATA.format(
pb = await datamanagers.conversations.get_splits_metadata(
self.resource.txn,
kbid=self.kbid,
uuid=self.rid,
type=self.type,
field=self.id,
rid=self.rid,
field_type=self.type,
field_id=self.id,
)
payload = await self.resource.txn.get(field_key)
if payload is None:
if pb is None:
return SplitsMetadata()
self._splits_metadata = SplitsMetadata()
self._splits_metadata.ParseFromString(payload)
self._splits_metadata = pb
return self._splits_metadata

async def set_splits_metadata(self, payload: SplitsMetadata) -> None:
key = CONVERSATION_SPLITS_METADATA.format(
await datamanagers.conversations.set_splits_metadata(
self.resource.txn,
kbid=self.kbid,
uuid=self.rid,
type=self.type,
field=self.id,
rid=self.rid,
field_type=self.type,
field_id=self.id,
splits_metadata=payload,
)
await self.resource.txn.set(key, payload.SerializeToString())
self._split_metadata = payload
self._splits_metadata = payload
self.resource.modified = True

async def delete_messages(self, message_idents: Iterable[str]) -> int:
Expand Down
4 changes: 1 addition & 3 deletions nucliadb/src/nucliadb/ingest/orm/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from typing import Any, cast

from nucliadb.common import datamanagers, file_md5
from nucliadb.common.datamanagers.resources import KB_RESOURCE_SLUG
from nucliadb.common.ids import FIELD_TYPE_PB_TO_STR, FIELD_TYPE_STR_TO_PB
from nucliadb.common.maindb.driver import Transaction
from nucliadb.ingest.fields.base import Field
Expand Down Expand Up @@ -123,8 +122,7 @@ async def get(cls, txn: Transaction, kbid: str, rid: str) -> Resource | None:

async def set_slug(self):
basic = await self.get_basic()
new_key = KB_RESOURCE_SLUG.format(kbid=self.kbid, slug=basic.slug)
await self.txn.set(new_key, self.uuid.encode())
await datamanagers.resources.set_slug(self.txn, kbid=self.kbid, rid=self.uuid, slug=basic.slug)

# Basic
async def get_basic(self) -> PBBasic:
Expand Down
Loading
Loading