Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions udata/core/activity/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from mongoengine.errors import DoesNotExist

from udata.api import API, api, fields
from udata.auth import current_user
from udata.core.dataset.permissions import OwnableReadPermission
from udata.core.organization.api_fields import org_ref_fields
from udata.core.owned import Owned
from udata.core.user.api_fields import user_ref_fields
from udata.models import Activity, db

Expand Down Expand Up @@ -101,18 +102,16 @@ def get(self):
# Always return a result even not complete
# But log the error (ie. visible in sentry, silent for user)
# Can happen when someone manually delete an object in DB (ie. without proper purge)
# - Filter out private items (except for sysadmin users)
# - Filter out items not visible to the current user
safe_items = []
for item in qs.queryset.items:
try:
item.related_to
except DoesNotExist as e:
log.error(e, exc_info=True)
else:
if hasattr(item.related_to, "private") and (
current_user.is_anonymous or not current_user.sysadmin
):
if item.related_to.private:
if isinstance(item.related_to, Owned):
if not OwnableReadPermission(item.related_to).can():
continue
safe_items.append(item)
qs.queryset.items = safe_items
Expand Down
31 changes: 31 additions & 0 deletions udata/core/dataset/permissions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from flask_principal import Permission as BasePermission
from flask_principal import RoleNeed

from udata.auth import Permission, UserNeed
from udata.core.organization.permissions import (
OrganizationAdminNeed,
Expand All @@ -22,6 +25,34 @@ def __init__(self, obj):
super(OwnablePermission, self).__init__(*needs)


class OwnableReadPermission(BasePermission):
"""Permission to read a private ownable object.

Always grants access if the object is not private.
For private objects, requires owner, org member, or sysadmin.

We inherit from BasePermission instead of udata's Permission because
Permission automatically adds RoleNeed("admin") to all needs. This means
a permission with no needs would only allow admins. With BasePermission,
an empty needs set allows everyone (Flask-Principal returns True when
self.needs is empty).
"""

def __init__(self, obj):
if not getattr(obj, "private", False):
super().__init__()
return

needs = [RoleNeed("admin")]
if obj.organization:
needs.append(OrganizationAdminNeed(obj.organization.id))
needs.append(OrganizationEditorNeed(obj.organization.id))
elif obj.owner:
needs.append(UserNeed(obj.owner.fs_uniquifier))

super().__init__(*needs)


class DatasetEditPermission(OwnablePermission):
"""Permissions to edit a Dataset"""

Expand Down
36 changes: 36 additions & 0 deletions udata/tests/api/test_activities_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from udata.core.activity.models import Activity
from udata.core.dataset.factories import DatasetFactory
from udata.core.dataset.models import Dataset
from udata.core.organization.factories import OrganizationFactory
from udata.core.reuse.factories import ReuseFactory
from udata.core.reuse.models import Reuse
from udata.core.topic.factories import TopicFactory
Expand Down Expand Up @@ -111,3 +112,38 @@ def test_activity_api_with_topic(self) -> None:
assert activity_data["related_to_id"] == str(topic.id)
assert activity_data["related_to_kind"] == "Topic"
assert activity_data["related_to_url"] == topic.self_api_url()

def test_activity_api_list_with_private_visible_to_owner(self) -> None:
"""Owner should see activities about their own private objects."""
owner = UserFactory()
dataset = DatasetFactory(private=True, owner=owner)
FakeDatasetActivity.objects.create(actor=UserFactory(), related_to=dataset)

# Anonymous user won't see it
response = self.get(url_for("api.activity"))
assert200(response)
assert len(response.json["data"]) == 0

# Owner should see their own private dataset activity
self.login(owner)
response = self.get(url_for("api.activity"))
assert200(response)
assert len(response.json["data"]) == 1

def test_activity_api_list_with_private_visible_to_org_member(self) -> None:
"""Organization members should see activities about their org's private objects."""
member = UserFactory()
org = OrganizationFactory(admins=[member])
dataset = DatasetFactory(private=True, organization=org)
FakeDatasetActivity.objects.create(actor=UserFactory(), related_to=dataset)

# Anonymous user won't see it
response = self.get(url_for("api.activity"))
assert200(response)
assert len(response.json["data"]) == 0

# Org member should see the private dataset activity
self.login(member)
response = self.get(url_for("api.activity"))
assert200(response)
assert len(response.json["data"]) == 1
82 changes: 81 additions & 1 deletion udata/tests/test_owned.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from mongoengine import post_save

import udata.core.owned as owned
from udata.core.dataset.permissions import OwnableReadPermission
from udata.core.organization.factories import OrganizationFactory
from udata.core.organization.models import Organization
from udata.core.user.factories import AdminFactory, UserFactory
from udata.core.user.models import User
from udata.models import Member
from udata.mongo import db
from udata.tests.api import DBTestCase
from udata.tests.api import APITestCase, DBTestCase


class CustomQuerySet(owned.OwnedQuerySet):
Expand Down Expand Up @@ -265,3 +266,82 @@ def test_visible_by_user(self) -> None:
name="private_owned_by_other_user"
)
self.assertEqual(len(result), 0)


class OwnableReadPermissionTest(APITestCase):
def setUp(self):
super().setUp()
from flask import g
from flask_principal import AnonymousIdentity

g.identity = AnonymousIdentity()

def test_public_object_visible_by_anonymous(self):
"""Public objects should be visible by anonymous users."""
obj = Owned.objects.create(owner=UserFactory(), private=False)
assert OwnableReadPermission(obj).can() is True

def test_public_object_visible_by_authenticated(self):
"""Public objects should be visible by authenticated users."""
obj = Owned.objects.create(owner=UserFactory(), private=False)
self.login()
assert OwnableReadPermission(obj).can() is True

def test_private_object_not_visible_by_anonymous(self):
"""Private objects should not be visible by anonymous users."""
obj = Owned.objects.create(owner=UserFactory(), private=True)
assert OwnableReadPermission(obj).can() is False

def test_private_object_not_visible_by_other_user(self):
"""Private objects should not be visible by other users."""
obj = Owned.objects.create(owner=UserFactory(), private=True)
self.login()
assert OwnableReadPermission(obj).can() is False

def test_private_object_visible_by_owner(self):
"""Private objects should be visible by their owner."""
owner = UserFactory()
obj = Owned.objects.create(owner=owner, private=True)
self.login(owner)
assert OwnableReadPermission(obj).can() is True

def test_private_object_visible_by_org_admin(self):
"""Private objects should be visible by organization admins."""
admin = UserFactory()
org = OrganizationFactory(members=[Member(user=admin, role="admin")])
obj = Owned.objects.create(organization=org, private=True)
self.login(admin)
assert OwnableReadPermission(obj).can() is True

def test_private_object_visible_by_org_editor(self):
"""Private objects should be visible by organization editors."""
editor = UserFactory()
org = OrganizationFactory(members=[Member(user=editor, role="editor")])
obj = Owned.objects.create(organization=org, private=True)
self.login(editor)
assert OwnableReadPermission(obj).can() is True

def test_private_object_not_visible_by_other_org_member(self):
"""Private objects should not be visible by members of other organizations."""
member = UserFactory()
OrganizationFactory(members=[Member(user=member, role="admin")])
org = OrganizationFactory()
obj = Owned.objects.create(organization=org, private=True)
self.login(member)
assert OwnableReadPermission(obj).can() is False

def test_private_object_visible_by_admin(self):
"""Private objects should be visible by sysadmins."""
admin = AdminFactory()
obj = Owned.objects.create(owner=UserFactory(), private=True)
self.login(admin)
assert OwnableReadPermission(obj).can() is True

def test_object_without_private_attribute(self):
"""Objects without private attribute should be visible by everyone."""

class OwnedWithoutPrivate(owned.Owned, db.Document):
name = db.StringField()

obj = OwnedWithoutPrivate.objects.create(owner=UserFactory())
assert OwnableReadPermission(obj).can() is True