diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml new file mode 100644 index 0000000..09d4746 --- /dev/null +++ b/.github/workflows/pytest.yml @@ -0,0 +1,38 @@ +name: Run python unit tests + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pytest pytest-cov + pip install -r requirements.txt + + - name: Run tests with coverage + run: | + pytest --cov=./ --cov-report=xml + + - name: Save coverage report + uses: actions/upload-artifact@v4 + with: + name: coverage-report + path: coverage.xml diff --git a/.gitignore b/.gitignore index 51ade83..e2dd01a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ .vscode/launch.json artifacts/* venv -.DS_Store \ No newline at end of file +.venv +.DS_Store +__pycache__ diff --git a/requirements.txt b/requirements.txt index fa0e8d4..dbfd30b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,14 @@ certifi==2024.7.4 charset-normalizer==3.0.1 +defusedxml==0.7.1 docutils==0.19 idna==3.7 +iniconfig==2.0.0 +packaging==24.1 +pluggy==1.5.0 +pytest==8.2.2 python-dateutil==2.8.2 requests==2.32.0 six==1.16.0 smmap==5.0.0 urllib3==1.26.19 -defusedxml==0.7.1 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_find_posts.py b/tests/test_find_posts.py new file mode 100644 index 0000000..4df950d --- /dev/null +++ b/tests/test_find_posts.py @@ -0,0 +1,1813 @@ +import json +import re +from datetime import datetime + +import find_posts +import pytest +import requests +from urllib import parse +from requests.models import Response +from unittest.mock import MagicMock, Mock, patch + +from find_posts import ( + add_context_urls, + add_user_posts, + filter_known_users, + get, + get_bookmarks, + get_favourites, + get_lemmy_comment_context, + get_lemmy_urls, + get_list_timeline, + get_list_users, + get_misskey_urls, + get_new_follow_requests, + get_new_followings, + get_peertube_urls, + get_toot_context, + get_user_id, + get_user_posts_mastodon, + get_user_posts_misskey, + parse_lemmy_profile_url, + parse_lemmy_url, + parse_mastodon_profile_url, + parse_mastodon_uri, + parse_mastodon_url, + parse_misskey_url, + parse_peertube_profile_url, + parse_peertube_url, + parse_pixelfed_profile_url, + parse_pixelfed_url, + parse_pleroma_url, + post, + set_server_apis, + user_has_opted_out, +) + + + +@patch("find_posts.get_paginated_mastodon") +def test_get_bookmarks(mock_get_paginated_mastodon): + server = "test_server" + access_token = "test_token" + max = 5 + + get_bookmarks(server, access_token, max) + + mock_get_paginated_mastodon.assert_called_once_with( + f"https://{server}/api/v1/bookmarks", + max, + { + "Authorization": f"Bearer {access_token}", + }, + ) + + +@pytest.mark.parametrize( + "server,access_token,max", + [ + ("test_server1", "test_token1", 2), + ("test_server2", "test_token2", 10), + ], +) +def test_get_bookmarks_parameterized(server, access_token, max): + with patch("find_posts.get_paginated_mastodon") as mock_get_paginated_mastodon: + get_bookmarks(server, access_token, max) + mock_get_paginated_mastodon.assert_called_once_with( + f"https://{server}/api/v1/bookmarks", + max, + { + "Authorization": f"Bearer {access_token}", + }, + ) + + +@patch("find_posts.get_paginated_mastodon") +def test_get_favourites(mock_get_paginated_mastodon): + server = "some.server" + access_token = "token123" + max = 5 + expected_result = "result" + + mock_get_paginated_mastodon.return_value = expected_result + + result = get_favourites(server, access_token, max) + + mock_get_paginated_mastodon.assert_called_once_with( + f"https://{server}/api/v1/favourites", + max, + { + "Authorization": f"Bearer {access_token}", + }, + ) + assert result == expected_result + + +@patch("find_posts.get_user_posts") +@patch("find_posts.add_post_with_context") +@patch("find_posts.logger") +def test_add_user_posts(mock_logger, mock_add_post, mock_get_posts): + server = "test_server" + access_token = "test_token" + followings = [ + {"acct": "user1", "url": "https://user1.com"}, + {"acct": "user2", "url": "https://test_server/user2"}, + ] + known_followings = set() + all_known_users = set() + seen_urls = set() + seen_hosts = set() + + mock_get_posts.return_value = [ + {"url": "https://user1.com/post1"}, + {"url": "https://user1.com/post2"}, + ] + mock_add_post.return_value = True + + add_user_posts( + server, + access_token, + followings, + known_followings, + all_known_users, + seen_urls, + seen_hosts, + ) + + mock_get_posts.assert_called_once_with( + followings[0], known_followings, server, seen_hosts + ) + assert mock_add_post.call_count == 2 + assert len(seen_urls) == 2 + assert "user1" in known_followings + assert "user1" in all_known_users + mock_logger.info.assert_called_with("Added 2 posts for user user1 with 0 errors") + + +@patch("find_posts.get_user_posts") +@patch("find_posts.add_post_with_context") +@patch("find_posts.logger") +def test_add_user_posts_with_no_new_posts(mock_logger, mock_add_post, mock_get_posts): + server = "test_server" + access_token = "test_token" + followings = [{"acct": "user1", "url": "https://user1.com"}] + known_followings = set() + all_known_users = set() + seen_urls = {"https://user1.com/post1", "https://user1.com/post2"} + seen_hosts = set() + + mock_get_posts.return_value = [ + {"url": "https://user1.com/post1"}, + {"url": "https://user1.com/post2"}, + ] + mock_add_post.return_value = True + + add_user_posts( + server, + access_token, + followings, + known_followings, + all_known_users, + seen_urls, + seen_hosts, + ) + + mock_get_posts.assert_called_once_with( + followings[0], known_followings, server, seen_hosts + ) + mock_add_post.assert_not_called() + assert len(seen_urls) == 2 + assert "user1" in known_followings + assert "user1" in all_known_users + + +@pytest.fixture +def mock_functions(): + with patch( + "find_posts.add_context_url", return_value=True + ) as add_context_url, patch( + "find_posts.parse_url", return_value=None + ) as parse_url, patch( + "find_posts.get_all_known_context_urls", return_value=[] + ) as get_all_known_context_urls, patch( + "find_posts.add_context_urls" + ) as add_context_urls: + yield add_context_url, parse_url, get_all_known_context_urls, add_context_urls + + +def test_add_post_with_context_post_not_added(mock_functions): + add_context_url, _, _, _ = mock_functions + add_context_url.return_value = False + + post = {"url": "http://example.com"} + server = "server" + access_token = "access_token" + seen_urls = set() + seen_hosts = set() + + result = find_posts.add_post_with_context( + post, server, access_token, seen_urls, seen_hosts + ) + + add_context_url.assert_called_once_with(post["url"], server, access_token) + + assert result is False + + +def test_user_has_opted_out(): + assert user_has_opted_out({"note": "I love robots"}) == False + assert user_has_opted_out({"note": "I love robots, nobot"}) == True + assert user_has_opted_out({"note": "/tags/nobot"}) == True + assert user_has_opted_out({"indexable": False}) == True + assert user_has_opted_out({"discoverable": False}) == True + + +@pytest.fixture +def webserver(): + return "server.com" + + +@pytest.fixture +def userName(): + return "test_user" + + +def test_get_user_posts_mastodon_success(userName, webserver): + with patch("find_posts.get_user_id") as mock_get_user_id, patch( + "find_posts.get" + ) as mock_get: + + # Mocking get_user_id + mock_get_user_id.return_value = 1234 + + # Mocking get function call + mock_response = Response() + mock_response.status_code = 200 + mock_response._content = b'{"data": "Test"}' + mock_get.return_value = mock_response + + result = get_user_posts_mastodon(userName, webserver) + assert result == {"data": "Test"} + + +def test_get_user_posts_mastodon_user_not_found(userName, webserver): + with patch("find_posts.get_user_id") as mock_get_user_id, patch( + "find_posts.get" + ) as mock_get: + + # Mocking get_user_id + mock_get_user_id.return_value = 1234 + + # Mocking get function call + mock_response = Response() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + result = get_user_posts_mastodon(userName, webserver) + assert result == None + + +def test_get_user_posts_mastodon_error_status_code(userName, webserver): + with patch("find_posts.get_user_id") as mock_get_user_id, patch( + "find_posts.get" + ) as mock_get: + + # Mocking get_user_id + mock_get_user_id.return_value = 1234 + + # Mocking get function call + mock_response = Response() + mock_response.status_code = 500 + mock_get.return_value = mock_response + + result = get_user_posts_mastodon(userName, webserver) + assert result == None + + +@patch("find_posts.get") +@patch("find_posts.logger") +def test_get_user_posts_lemmy_community(mock_logger, mock_get): + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"posts": [{"post": {"ap_id": "test_url"}}]} + mock_get.return_value = mock_response + + result = find_posts.get_user_posts_lemmy( + "test_user", "https://test.com/c/test_user", "test.com" + ) + + assert result == [{"ap_id": "test_url", "url": "test_url"}] + mock_get.assert_called_once_with( + "https://test.com/api/v3/post/list?community_name=test_user&sort=New&limit=50" + ) + mock_logger.error.assert_not_called() + + +@patch("find_posts.get") +@patch("find_posts.logger") +def test_get_user_posts_lemmy_user(mock_logger, mock_get): + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "posts": [{"post": {"ap_id": "post_url"}}], + "comments": [{"post": {"ap_id": "comment_url"}}], + } + mock_get.return_value = mock_response + + result = find_posts.get_user_posts_lemmy( + "test_user", "https://test.com/u/test_user", "test.com" + ) + + assert result == [ + {"ap_id": "comment_url", "url": "comment_url"}, + {"ap_id": "post_url", "url": "post_url"}, + ] + mock_get.assert_called_once_with( + "https://test.com/api/v3/user?username=test_user&sort=New&limit=50" + ) + mock_logger.error.assert_not_called() + + +@patch("find_posts.get") +@patch("find_posts.logger") +def test_get_user_posts_peertube(mock_logger, mock_get): + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"data": "test_data"} + mock_get.return_value = mock_response + + result = find_posts.get_user_posts_peertube("test_user", "test_webserver") + + assert result == "test_data" + mock_get.assert_called_once_with( + "https://test_webserver/api/v1/accounts/test_user/videos" + ) + mock_logger.error.assert_not_called() + + +@patch("find_posts.post") +@patch("find_posts.logger") +def test_get_user_posts_misskey(mock_logger, mock_post): + mock_response = mock_post.return_value + mock_response.status_code = 200 + mock_response.json.return_value = [ + {"host": None, "id": "id1"}, + {"host": "host1", "id": "id2"}, + ] + + result = get_user_posts_misskey("username", "webserver") + + mock_post.assert_called_with( + "https://webserver/api/users/notes", {"userId": "id1", "limit": 40} + ) + mock_logger.error.assert_not_called() + assert result is not None + + +@patch("find_posts.get_paginated_mastodon") +@patch("find_posts.filter_known_users") +@patch("find_posts.logger") +def test_get_new_follow_requests( + mock_logger, mock_filter_known_users, mock_get_paginated_mastodon +): + mock_get_paginated_mastodon.return_value = ["request1", "request2"] + mock_filter_known_users.return_value = ["request1"] + + result = get_new_follow_requests("server", "access_token", 10, ["known_following"]) + + mock_get_paginated_mastodon.assert_called_with( + "https://server/api/v1/follow_requests", + 10, + { + "Authorization": "Bearer access_token", + }, + ) + mock_filter_known_users.assert_called_with( + ["request1", "request2"], ["known_following"] + ) + mock_logger.info.assert_called_with("Got 2 follow_requests, 1 of which are new") + assert result == ["request1"] + + +def test_filter_known_users(): + users = [ + {"acct": "user1"}, + {"acct": "user2"}, + {"acct": "user3"}, + ] + known_users = ["user1", "user3"] + + filtered_users = filter_known_users(users, known_users) + + assert filtered_users == [{"acct": "user2"}] + + +def test_filter_known_users_no_known_users(): + users = [ + {"acct": "user1"}, + {"acct": "user2"}, + {"acct": "user3"}, + ] + known_users = [] + + filtered_users = filter_known_users(users, known_users) + + assert filtered_users == users + + +def test_filter_known_users_all_users_known(): + users = [ + {"acct": "user1"}, + {"acct": "user2"}, + {"acct": "user3"}, + ] + known_users = ["user1", "user2", "user3"] + + filtered_users = filter_known_users(users, known_users) + + assert filtered_users == [] + + +def test_filter_known_users_no_users(): + users = [] + known_users = ["user1", "user2", "user3"] + + filtered_users = filter_known_users(users, known_users) + + assert filtered_users == [] + + +@patch("find_posts.get_paginated_mastodon") +@patch("find_posts.filter_known_users") +@patch("find_posts.logger") +def test_get_new_followers( + mock_logger, mock_filter_known_users, mock_get_paginated_mastodon +): + mock_get_paginated_mastodon.return_value = ["follower1", "follower2", "follower3"] + mock_filter_known_users.return_value = ["follower2", "follower3"] + + server = "server" + user_id = 1 + max = 50 + known_followers = ["follower1"] + + expected_result = ["follower2", "follower3"] + result = find_posts.get_new_followers(server, user_id, max, known_followers) + + mock_get_paginated_mastodon.assert_called_once_with( + f"https://{server}/api/v1/accounts/{user_id}/followers", max + ) + mock_filter_known_users.assert_called_once_with( + ["follower1", "follower2", "follower3"], known_followers + ) + mock_logger.info.assert_called_once_with("Got 3 followers, 2 of which are new") + + assert result == expected_result + + +@patch("find_posts.get_paginated_mastodon") +@patch("find_posts.filter_known_users") +@patch("find_posts.logger") +def test_get_new_followings( + mock_logger, mock_filter_known_users, mock_get_paginated_mastodon +): + mock_get_paginated_mastodon.return_value = ["user1", "user2", "user3"] + mock_filter_known_users.return_value = ["user1", "user2"] + result = get_new_followings("server", "100", 5, "known_users") + mock_get_paginated_mastodon.assert_called_with( + "https://server/api/v1/accounts/100/following", 5 + ) + mock_filter_known_users.assert_called_with( + ["user1", "user2", "user3"], "known_users" + ) + assert result == ["user1", "user2"] + mock_logger.info.assert_called_with("Got 3 followings, 2 of which are new") + + +@patch("find_posts.get") +def test_get_user_id_with_username(mock_get): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "123"} + mock_get.return_value = mock_response + result = get_user_id("server", user="test_user") + mock_get.assert_called_with( + "https://server/api/v1/accounts/lookup?acct=test_user", headers={} + ) + assert result == "123" + + +@patch("find_posts.get") +def test_get_user_id_with_access_token(mock_get): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "456"} + mock_get.return_value = mock_response + result = get_user_id("server", access_token="test_token") + mock_get.assert_called_with( + "https://server/api/v1/accounts/verify_credentials", + headers={ + "Authorization": "Bearer test_token", + }, + ) + assert result == "456" + + +def test_get_user_id_with_no_user_or_token(): + with pytest.raises( + Exception, + match="You must supply either a user name or an access token, to get an user ID", + ): + get_user_id("server") + + +@patch("find_posts.get") +def test_get_user_id_with_404_status_code(mock_get): + mock_response = MagicMock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + with pytest.raises( + Exception, match="User test_user was not found on server server." + ): + get_user_id("server", user="test_user") + + +@patch("find_posts.get") +def test_get_user_id_with_non_200_or_404_status_code(mock_get): + mock_response = MagicMock() + mock_response.status_code = 500 + mock_get.return_value = mock_response + with pytest.raises( + Exception, + match=re.escape( + "Error getting URL https://server/api/v1/accounts/lookup?acct=test_user. Status code: 500" + ), + ): + get_user_id("server", user="test_user") + + +@patch("find_posts.get_toots") +def test_get_timeline(mock_get_toots): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = ["toot1", "toot2", "toot3"] + mock_response.links = {} + mock_get_toots.return_value = mock_response + + timeline = find_posts.get_timeline("server", "token", 5) + + mock_get_toots.assert_any_call("https://server/api/v1/timelines/home", "token") + assert len(timeline) == 3 + + +@patch("find_posts.get", autospec=True) +def test_get_reply_toots_error_status_code(mock_get): + mock_resp = Mock() + mock_resp.status_code = 403 + mock_get.return_value = mock_resp + with pytest.raises(Exception) as e_info: + find_posts.get_reply_toots( + "test_user", + "test_server", + "test_token", + ["some_seen_url"], + datetime(2020, 1, 1), + ) + assert ( + "Make sure you have the read:statuses scope enabled for your access token." + in str(e_info.value) + ) + + +@patch("find_posts.logger") +def test_toot_context_can_be_fetched_public(mock_logger): + toot = {"visibility": "public", "uri": "sample_uri"} + result = find_posts.toot_context_can_be_fetched(toot) + assert result is True + mock_logger.debug.assert_not_called() + + +@patch("find_posts.logger") +def test_toot_context_can_be_fetched_unlisted(mock_logger): + toot = {"visibility": "unlisted", "uri": "sample_uri"} + result = find_posts.toot_context_can_be_fetched(toot) + assert result is True + mock_logger.debug.assert_not_called() + + +@patch("find_posts.logger") +def test_toot_context_can_be_fetched_private(mock_logger): + toot = {"visibility": "private", "uri": "sample_uri"} + result = find_posts.toot_context_can_be_fetched(toot) + assert result is False + mock_logger.debug.assert_called_once_with( + "Cannot fetch context of private toot sample_uri" + ) + + +toot_with_existing_uri = { + "uri": "existing_uri", + "lastSeen": datetime.now(), + "created_at": datetime.now(), +} + +toot_with_new_uri = { + "uri": "new_uri", + "lastSeen": datetime.now(), + "created_at": datetime.now(), +} + +recently_checked_context = {"existing_uri": toot_with_existing_uri} + + +@patch("find_posts.toot_has_parseable_url") +@patch("find_posts.parse_url") +@patch("find_posts.toot_context_can_be_fetched") +@patch("find_posts.toot_context_should_be_fetched") +@patch("find_posts.get_toot_context") +@patch("find_posts.logger", new_callable=Mock()) +def test_get_all_known_context_urls( + mock_logger, + get_toot_context, + toot_context_should_be_fetched, + toot_context_can_be_fetched, + parse_url, + toot_has_parseable_url, +): + server = "test_server" + reply_toots = [ + {"url": "test_url_1", "reblog": None, "uri": "test_uri_1"}, + {"url": "test_url_2", "reblog": {"url": "reblog_url_2"}, "uri": "test_uri_2"}, + ] + parsed_urls = ["parsed_url_1", "parsed_url_2"] + seen_hosts = ["seen_host_1", "seen_host_2"] + find_posts.recently_checked_context = { + "test_uri_1": {"lastSeen": datetime.now()}, + "test_uri_2": {"lastSeen": datetime.now()}, + } + + toot_has_parseable_url.return_value = True + parse_url.return_value = ["parsed_url", "parsed_url_host"] + toot_context_can_be_fetched.return_value = True + toot_context_should_be_fetched.return_value = True + get_toot_context.return_value = ["context_item_1", "context_item_2"] + + result_urls = find_posts.get_all_known_context_urls( + server, reply_toots, parsed_urls, seen_hosts + ) + + # check if parseable url method called twice and the arguments correct + assert toot_has_parseable_url.call_count == 2 + toot_has_parseable_url.assert_any_call(reply_toots[0], parsed_urls) + toot_has_parseable_url.assert_any_call(reply_toots[1], parsed_urls) + + # check if parse url method was first called with the first toot url then with its reblog url + parse_url.assert_any_call("test_url_1", parsed_urls) + parse_url.assert_any_call("reblog_url_2", parsed_urls) + + # check if format of logger.info message is correct + mock_logger.info.assert_called_once_with("Found 2 known context toots") + + # check if the correct context urls are returned + assert result_urls == {"context_item_1", "context_item_2"} + + +def test_toot_has_parseable_url_with_parseable_url(): + toot = {"url": "http://test.com", "reblog": None} + parsed_urls = [] + with patch("find_posts.parse_url", return_value="something") as mock_parse_url: + assert find_posts.toot_has_parseable_url(toot, parsed_urls) + mock_parse_url.assert_called_once_with("http://test.com", parsed_urls) + + +def test_toot_has_parseable_url_with_unparseable_url(): + toot = {"url": "http://test.com", "reblog": None} + parsed_urls = [] + with patch("find_posts.parse_url", return_value=None) as mock_parse_url: + assert not find_posts.toot_has_parseable_url(toot, parsed_urls) + mock_parse_url.assert_called_once_with("http://test.com", parsed_urls) + + +def test_get_replied_toot_server_id_no_mentions(): + toot = {"in_reply_to_id": "1", "in_reply_to_account_id": "1", "mentions": []} + assert find_posts.get_replied_toot_server_id("server", toot, {}, {}) is None + + +def test_get_replied_toot_server_id_no_url_redirect(): + toot = { + "in_reply_to_id": "1", + "in_reply_to_account_id": "1", + "mentions": [{"id": "1", "acct": "account"}], + } + with patch("find_posts.get_redirect_url", return_value=None): + assert find_posts.get_replied_toot_server_id("server", toot, {}, {}) is None + + +def test_get_replied_toot_server_id_with_url_redirect(): + toot = { + "in_reply_to_id": "1", + "in_reply_to_account_id": "1", + "mentions": [{"id": "1", "acct": "account"}], + } + with patch("find_posts.get_redirect_url", return_value="redirect_url"), patch( + "find_posts.parse_url", return_value="match" + ) as mock_parse: + assert find_posts.get_replied_toot_server_id("server", toot, {}, {}) == ( + "redirect_url", + "match", + ) + mock_parse.assert_called_once_with("redirect_url", {}) + + +def test_get_replied_toot_server_id_with_existing_replied_toot_server_ids(): + toot = { + "in_reply_to_id": "1", + "in_reply_to_account_id": "1", + "mentions": [{"id": "1", "acct": "account"}], + } + replied_toot_server_ids = {"https://server/@account/1": ("url", "match")} + + assert find_posts.get_replied_toot_server_id( + "server", toot, replied_toot_server_ids, {} + ) == ("url", "match") + + +@patch("find_posts.parse_mastodon_profile_url") +@patch("find_posts.parse_pleroma_profile_url") +@patch("find_posts.parse_lemmy_profile_url") +@patch("find_posts.parse_peertube_profile_url") +@patch("find_posts.parse_pixelfed_profile_url") +@patch("find_posts.logger") +def test_parse_user_url( + mock_logger, + mock_parse_pixelfed, + mock_parse_peertube, + mock_parse_lemmy, + mock_parse_pleroma, + mock_parse_mastodon, +): + + url = "test_url" + match_value = "match" + + # Test that the function return a mastodon url when match is not None + mock_parse_mastodon.return_value = match_value + assert find_posts.parse_user_url(url) == match_value + mock_parse_mastodon.assert_called_once_with(url) + + # Test that the function return a pleroma url when mastodon match is None + mock_parse_mastodon.return_value = None + mock_parse_pleroma.return_value = match_value + assert find_posts.parse_user_url(url) == match_value + mock_parse_pleroma.assert_called_once_with(url) + + # Continue similarly for the other urls: lemmy, peertube and pixelfed + mock_parse_pleroma.return_value = None + mock_parse_lemmy.return_value = match_value + assert find_posts.parse_user_url(url) == match_value + mock_parse_lemmy.assert_called_once_with(url) + + mock_parse_lemmy.return_value = None + mock_parse_peertube.return_value = match_value + assert find_posts.parse_user_url(url) == match_value + mock_parse_peertube.assert_called_once_with(url) + + mock_parse_peertube.return_value = None + mock_parse_pixelfed.return_value = match_value + assert find_posts.parse_user_url(url) == match_value + mock_parse_pixelfed.assert_called_once_with(url) + + # Test that function logs an error and returns None when no match is found + mock_parse_pixelfed.return_value = None + assert find_posts.parse_user_url(url) == None + mock_logger.error.assert_called_once_with(f"Error parsing Profile URL {url}") + + +def test_parse_mastodon_profile_url_success(): + url = "https://mastodon.social/@username" + result = parse_mastodon_profile_url(url) + assert result == ("mastodon.social", "username") + + +def test_parse_mastodon_profile_url_not_match(): + url = "https://mastodon.social/username" + result = parse_mastodon_profile_url(url) + assert result == None + + +def test_parse_mastodon_url(): + valid_url = "https://mastodon.social/@user/1234" + invalid_url = "https://twitter.com/user/status/1234" + null_url = None + + # Testing valid mastodon URL + server, toot_id = parse_mastodon_url(valid_url) + assert server == "mastodon.social" + assert toot_id == "1234" + + # Testing invalid URL + assert parse_mastodon_url(invalid_url) is None + + # Testing null URL + with pytest.raises(TypeError): + parse_mastodon_url(null_url) + + +def test_parse_mastodon_uri(): + # Test that a valid URI is correctly parsed + uri = "https://my.server.com/users/testuser/statuses/123456" + assert parse_mastodon_uri(uri) == ("my.server.com", "123456") + + # Test that an invalid URI returns None + uri = "http://invalid.uri.com" + assert parse_mastodon_uri(uri) == None + + # Test that a URI missing elements returns None + uri = "https://missing.elements.com/users/testuser/" + assert parse_mastodon_uri(uri) == None + + # Test that a URI with extra elements returns the correct server and ID + uri = "https://extra.elements.com/users/testuser/statuses/123456/7890" + assert parse_mastodon_uri(uri) == ("extra.elements.com", "123456") + + # Test that a URI with different protocol still works + uri = "http://still.works.com/users/testuser/statuses/123456" + assert parse_mastodon_uri(uri) == None + + # Test that a URI without protocol doesn't work + uri = "nowork/users/testuser/statuses/123456" + assert parse_mastodon_uri(uri) == None + + # Test that a URI without slashes after https:// doesn't work + uri = "https://noworkusers/testuser/statuses/123456" + assert parse_mastodon_uri(uri) == None + + # Test the boundary case of an empty string + uri = "" + assert parse_mastodon_uri(uri) == None + + +@patch("find_posts.get_redirect_url") +def test_parse_pleroma_url(mock_get_redirect_url): + mock_get_redirect_url.return_value = "/notice/123" + + result = parse_pleroma_url("https://example.com/objects/567") + assert result == ("example.com", "123") + + mock_get_redirect_url.return_value = None + result = parse_pleroma_url("https://example.com/objects/567") + assert result is None + + result = parse_pleroma_url("not a url") + assert result is None + + mock_get_redirect_url.return_value = "/different_pattern/123" + result = parse_pleroma_url("https://example.com/objects/567") + assert result is None + + mock_get_redirect_url.return_value = "/notice/789" + result = parse_pleroma_url("https://different.example.com/objects/111") + assert result == ("different.example.com", "789") + + +import re +import pytest +from find_posts import parse_pleroma_profile_url + + +def test_parse_pleroma_profile_url(): + # successful parsing + result = parse_pleroma_profile_url("https://pleroma.server/users/username") + assert result == ("pleroma.server", "username") + + # unsuccessful parsing + result = parse_pleroma_profile_url("http://notvalid/url") + assert result is None + + # url with extra path and query string + result = parse_pleroma_profile_url( + "https://pleroma.server/users/username/extra/path?arg=value" + ) + assert result == ("pleroma.server", "username") + + # url with www + result = parse_pleroma_profile_url("https://www.pleroma.server/users/username") + assert result == ("www.pleroma.server", "username") + + # url without https + result = parse_pleroma_profile_url("http://pleroma.server/users/username") + assert result is None + + +def test_parse_pixelfed_url(): + url = "https://server.com/p/username/post123" + assert parse_pixelfed_url(url) == ("server.com", "post123") + + +def test_parse_pixelfed_url_no_match(): + url = "https://notaurl.com/abc/123" + assert parse_pixelfed_url(url) is None + + +def test_parse_pixelfed_url_malformed(): + url = "malformed url" + assert parse_pixelfed_url(url) is None + + +def test_parse_misskey_url(): + url = "https://misskey.io/notes/837jfe8372" + server, toot_id = parse_misskey_url(url) + assert server == "misskey.io" + assert toot_id == "837jfe8372" + + +def test_parse_misskey_url_no_match(): + url = "https://notamisskeyurl.com" + result = parse_misskey_url(url) + assert result is None + + +def test_parse_misskey_url_incorrect_path(): + url = "https://misskey.io/notnotes/837jfe8372" + result = parse_misskey_url(url) + assert result is None + + +def test_parse_peertube_url_valid(): + # define a valid url + url = "https://example.com/videos/watch/123456789" + + # the expected server and id from the url + expected = ("example.com", "123456789") + + # call the function with the valid url + result = parse_peertube_url(url) + + # assert that the result is as expected + assert result == expected + + +def test_parse_peertube_url_invalid(): + # define an invalid url + url = "https://bad.example.com/watch/123456789" + + # call the function with the invalid url + result = parse_peertube_url(url) + + # assert that the result is None + assert result is None + + +def test_parse_peertube_url_no_match(): + # define a url without a match + url = "https://example.com/videos/123456789" + + # call the function with the url without a match + result = parse_peertube_url(url) + + # assert that the result is None + assert result is None + + +def test_parse_pixelfed_profile_url_success(): + url = "https://pixelfed.server/user.name" + server, username = parse_pixelfed_profile_url(url) + assert server == "pixelfed.server" + assert username == "user.name" + + +def test_parse_pixelfed_profile_url_invalid_url(): + url = "pixelfed.server/user.name" + result = parse_pixelfed_profile_url(url) + assert result is None + + +def test_parse_pixelfed_profile_url_empty_url(): + url = "" + result = parse_pixelfed_profile_url(url) + assert result is None + + +def test_parse_lemmy_url_success(): + url = "https://testserver/post/1234" + + result = parse_lemmy_url(url) + + assert result == ("testserver", "1234") + + +def test_parse_lemmy_url_fail_invalid_url(): + url = "http://testserver/post/1234" + + result = parse_lemmy_url(url) + + assert result == None + + +def test_parse_lemmy_url_fail_no_id(): + url = "https://testserver/post/" + + result = parse_lemmy_url(url) + + assert result == None + + +def test_parse_lemmy_url_fail_no_protocol(): + url = "testserver/post/1234" + + result = parse_lemmy_url(url) + + assert result == None + + +def test_parse_lemmy_profile_url(): + url = "https://my.lemmy.server/u/username" + result = parse_lemmy_profile_url(url) + assert result == ("my.lemmy.server", "username") + + +def test_parse_lemmy_profile_url_no_match(): + url = "http://my.lemmy.server/u/username" + result = parse_lemmy_profile_url(url) + assert result is None + + +def test_parse_lemmy_profile_url_with_community(): + url = "https://my.lemmy.server/c/username" + result = parse_lemmy_profile_url(url) + assert result == ("my.lemmy.server", "username") + + +def test_parse_peertube_profile_url_valid(): + server, username = parse_peertube_profile_url( + "https://myserver.com/accounts/TestUser" + ) + assert server == "myserver.com" + assert username == "TestUser" + + +def test_parse_peertube_profile_url_invalid(): + assert parse_peertube_profile_url("https://invalidurl.com/TestUser") is None + + +def test_parse_peertube_profile_url_none(): + with pytest.raises(TypeError): + parse_peertube_profile_url(None) + + +@patch("find_posts.requests") +@patch("find_posts.logger") +def test_get_redirect_url_success(mock_logger, mock_requests): + response = Response() + response.status_code = 200 + mock_requests.head.return_value = response + assert find_posts.get_redirect_url("https://test.com") == "https://test.com" + mock_logger.error.assert_not_called() + mock_logger.debug.assert_not_called() + + +@patch("find_posts.requests") +@patch("find_posts.logger") +def test_get_redirect_url_redirected(mock_logger, mock_requests): + response = Response() + response.status_code = 302 + response.headers = {"Location": "https://redirected.com"} + mock_requests.head.return_value = response + assert find_posts.get_redirect_url("https://test.com") == "https://redirected.com" + mock_logger.error.assert_not_called() + mock_logger.debug.assert_called_once() + + +@patch("find_posts.requests") +@patch("find_posts.logger") +def test_get_redirect_url_error_status_code(mock_logger, mock_requests): + response = Response() + response.status_code = 500 + mock_requests.head.return_value = response + assert find_posts.get_redirect_url("https://test.com") is None + mock_logger.error.assert_called_once() + mock_logger.debug.assert_not_called() + + +@patch("find_posts.requests") +@patch("find_posts.logger") +def test_get_redirect_url_exception(mock_logger, mock_requests): + mock_requests.head.side_effect = requests.exceptions.RequestException + assert find_posts.get_redirect_url("https://test.com") is None + mock_logger.error.assert_called_once() + mock_logger.debug.assert_not_called() + + +@patch("find_posts.get_server_info") +@patch("find_posts.logger") +def test_get_toot_context_no_server_info(mock_logger, mock_server_info): + mock_server_info.return_value = None + assert get_toot_context("server1", "toot1", "url1", {}) == [] + mock_logger.error.assert_called_once_with("server server1 not found for post") + + +@pytest.fixture +def mock_response_success(): + return_value = MagicMock() + return_value.status_code = 200 + return_value.json.return_value = { + "ancestors": [{"url": "https://abc.com/statuses/123456"}], + "descendants": [{"url": "https://abc.com/statuses/789012"}], + } + return return_value + + +@pytest.fixture +def mock_response_fail(): + return_value = MagicMock() + return_value.status_code = 404 + return return_value + + +@patch("find_posts.get") +@patch("find_posts.logger") +def test_get_mastodon_urls_request_fail(mock_logger, mock_get, mock_response_fail): + mock_get.return_value = mock_response_fail + + result = find_posts.get_mastodon_urls( + "abc.com", "123456", "https://abc.com/statuses/123456" + ) + + assert list(result) == [] + mock_logger.error.assert_called_once() + + +@patch("find_posts.get") +@patch("find_posts.logger") +def test_get_mastodon_urls_exception(mock_logger, mock_get): + mock_get.side_effect = Exception("Test exception") + + result = find_posts.get_mastodon_urls( + "abc.com", "123456", "https://abc.com/statuses/123456" + ) + + assert list(result) == [] + mock_logger.error.assert_called_once() + + +@patch("find_posts.get_lemmy_comment_context") +@patch("find_posts.get_lemmy_comments_urls") +@patch("find_posts.logger") +def test_get_lemmy_urls_comment( + mock_logger, mock_get_lemmy_comments_urls, mock_get_lemmy_comment_context +): + webserver = "webserver" + toot_id = "toot_id" + toot_url = "/comment/" + + get_lemmy_urls(webserver, toot_id, toot_url) + + mock_get_lemmy_comment_context.assert_called_once_with(webserver, toot_id, toot_url) + mock_logger.error.assert_not_called() + + +@patch("find_posts.get_lemmy_comment_context") +@patch("find_posts.get_lemmy_comments_urls") +@patch("find_posts.logger") +def test_get_lemmy_urls_post( + mock_logger, mock_get_lemmy_comments_urls, mock_get_lemmy_comment_context +): + webserver = "webserver" + toot_id = "toot_id" + toot_url = "/post/" + + get_lemmy_urls(webserver, toot_id, toot_url) + + mock_get_lemmy_comments_urls.assert_called_once_with(webserver, toot_id, toot_url) + mock_logger.error.assert_not_called() + + +@patch("find_posts.get_lemmy_comment_context") +@patch("find_posts.get_lemmy_comments_urls") +@patch("find_posts.logger") +def test_get_lemmy_urls_else( + mock_logger, mock_get_lemmy_comments_urls, mock_get_lemmy_comment_context +): + webserver = "webserver" + toot_id = "toot_id" + toot_url = "/else/" + + result = get_lemmy_urls(webserver, toot_id, toot_url) + + assert result == [] + mock_get_lemmy_comments_urls.assert_not_called() + mock_get_lemmy_comment_context.assert_not_called() + mock_logger.error.assert_called_once_with(f"unknown lemmy url type {toot_url}") + + +@patch("find_posts.get") +@patch("find_posts.logger") +def test_get_lemmy_comment_context_get_fail(mock_logger, mock_get): + mock_get.side_effect = Exception + + assert ( + get_lemmy_comment_context("webserver.com", "test_toot_id", "test_toot_url") + == [] + ) + + mock_get.assert_called_once_with( + "https://webserver.com/api/v3/comment?id=test_toot_id" + ) + mock_logger.error.assert_called_once() + + +@patch("find_posts.get") +@patch("find_posts.logger") +def test_get_lemmy_comment_context_parse_fail(mock_logger, mock_get): + mock_get.return_value.status_code = 200 + mock_get.return_value.json.return_value = {"invalid_key": "invalid_value"} + + assert ( + get_lemmy_comment_context("webserver.com", "test_toot_id", "test_toot_url") + == [] + ) + + mock_get.assert_called_once_with( + "https://webserver.com/api/v3/comment?id=test_toot_id" + ) + mock_logger.error.assert_called_once() + + +def test_get_peertube_urls_success(): + with patch("find_posts.get") as mock_get: + mock_resp = Response() + mock_resp.status_code = 200 + mock_resp._content = json.dumps( + {"data": [{"url": "http://example.com/1"}, {"url": "http://example.com/2"}]} + ).encode("utf-8") + + mock_get.return_value = mock_resp + + urls = get_peertube_urls("example.com", "123", "http://toot_url.com") + mock_get.assert_called_once_with( + "https://example.com/api/v1/videos/123/comment-threads" + ) + assert urls == ["http://example.com/1", "http://example.com/2"] + + +@patch("find_posts.logger") +def test_get_peertube_urls_exception(mock_logger): + with patch("find_posts.get") as mock_get: + mock_get.side_effect = Exception("Test exception") + + urls = get_peertube_urls("example.com", "123", "http://toot_url.com") + mock_get.assert_called_once_with( + "https://example.com/api/v1/videos/123/comment-threads" + ) + mock_logger.error.assert_called_once_with( + "Error getting comments on video 123 from http://toot_url.com. Exception: Test exception" + ) + assert urls == [] + + +def test_get_misskey_urls_success(): + with patch("find_posts.post") as mock_post, patch( + "find_posts.logger" + ) as mock_logger: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = [{"id": "123"}, {"id": "456"}] + mock_post.return_value = mock_response + result = get_misskey_urls("testserver", "1", "testurl") + expected = [ + "https://testserver/notes/123", + "https://testserver/notes/456", + "https://testserver/notes/123", + "https://testserver/notes/456", + ] + assert result == expected + assert mock_post.call_count == 2 + assert mock_logger.debug.call_count == 2 + + +def test_get_misskey_urls_post_error(): + with patch("find_posts.post") as mock_post, patch( + "find_posts.logger" + ) as mock_logger: + mock_post.side_effect = Exception("Error") + result = get_misskey_urls("testserver", "1", "testurl") + expected = [] + assert result == expected + assert mock_post.call_count == 1 + assert mock_logger.error.call_count == 1 + + +def test_get_misskey_urls_non_200_response(): + with patch("find_posts.post") as mock_post, patch( + "find_posts.logger" + ) as mock_logger: + mock_response = MagicMock() + mock_response.status_code = 404 + mock_post.return_value = mock_response + result = get_misskey_urls("testserver", "1", "testurl") + expected = [] + assert result == expected + assert mock_logger.error.called + + +def test_get_misskey_urls_json_error(): + with patch("find_posts.post") as mock_post, patch( + "find_posts.logger" + ) as mock_logger: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.side_effect = Exception("JSON Error") + mock_post.return_value = mock_response + result = get_misskey_urls("testserver", "1", "testurl") + expected = [] + assert result == expected + assert mock_post.call_count == 2 + assert mock_logger.error.call_count == 2 + + +@patch("find_posts.add_context_url", return_value=False) +@patch("find_posts.logger") +def test_add_context_urls_all_fail(mock_logger, mock_add_context_url): + server = "test_server" + access_token = "test_token" + context_urls = ["url1", "url2", "url3", "url4"] + seen_urls = set() + + result = add_context_urls(server, access_token, context_urls, seen_urls) + + assert mock_add_context_url.call_count == 4 + assert len(seen_urls) == 0 + assert ( + mock_logger.info.call_args[0][0] + == "Added 0 new context toots (with 4 failures)" + ) + + +@patch("find_posts.add_context_url", return_value=True) +@patch("find_posts.logger") +def test_add_context_urls_all_success(mock_logger, mock_add_context_url): + server = "test_server" + access_token = "test_token" + context_urls = ["url1", "url2", "url3", "url4"] + seen_urls = set() + + result = add_context_urls(server, access_token, context_urls, seen_urls) + + assert mock_add_context_url.call_count == 4 + assert len(seen_urls) == 4 + assert "url1" in seen_urls + assert "url2" in seen_urls + assert "url3" in seen_urls + assert "url4" in seen_urls + assert ( + mock_logger.info.call_args[0][0] + == "Added 4 new context toots (with 0 failures)" + ) + + +class MockResponse: + def __init__(self, status_code, links=None, json_data=None): + self.status_code = status_code + self.links = links + self.json_data = json_data + + def json(self): + return self.json_data + + +def test_add_context_url(): + with patch("find_posts.get", return_value=MockResponse(200)) as mocked_get: + result = find_posts.add_context_url("test-url", "test-server", "test-token") + assert result + mocked_get.assert_called_once() + assert ( + mocked_get.call_args[0][0] + == "https://test-server/api/v2/search?q=test-url&resolve=true&limit=1" + ) + + with patch("find_posts.get", return_value=MockResponse(403)) as mocked_get: + result = find_posts.add_context_url("test-url", "test-server", "test-token") + assert not result + + +def test_get_paginated_mastodon(): + json_data = [{"created_at": "2022-02-18T05:31:00.000Z"} for _ in range(10)] + with patch( + "find_posts.get", return_value=MockResponse(200, json_data=json_data) + ) as mocked_get: + result = find_posts.get_paginated_mastodon("test-url", 10) + assert len(result) == 10 + mocked_get.assert_called_once() + + with patch("find_posts.get", return_value=MockResponse(401)) as mocked_get: + with pytest.raises(Exception): + find_posts.get_paginated_mastodon("test-url", 10) + + with patch("find_posts.get", return_value=MockResponse(403)) as mocked_get: + with pytest.raises(Exception): + find_posts.get_paginated_mastodon("test-url", 10) + + with patch("find_posts.get", return_value=MockResponse(500)) as mocked_get: + with pytest.raises(Exception): + find_posts.get_paginated_mastodon("test-url", 10) + + +def test_get_cached_robots_cached(): + find_posts.ROBOTS_TXT = {"test_url": "test_robots_txt"} + assert find_posts.get_cached_robots("test_url") == "test_robots_txt" + + +@patch("find_posts.get_robots_txt_cache_path", return_value="test_cache_path") +def test_get_cached_robots_no_cache(mock_get_path): + find_posts.ROBOTS_TXT = {} + assert find_posts.get_cached_robots("test_url") is None + + +@patch("find_posts.get_cached_robots", return_value="test_robots_txt") +def test_get_robots_from_url_cached(mock_get_cached_robots): + assert find_posts.get_robots_from_url("test_url") == "test_robots_txt" + + +@patch("find_posts.get") +@patch("find_posts.get_cached_robots", return_value=None) +def test_get_robots_from_url_exception(mock_get_cached_robots, mock_get): + mock_get.side_effect = Exception + find_posts.ROBOTS_TXT = {} + assert find_posts.get_robots_from_url("test_url") is True + assert find_posts.ROBOTS_TXT["test_url"] is True + + +@patch("find_posts.get_robots_from_url") +@patch("urllib.robotparser.RobotFileParser") +def test_can_fetch(mock_robotFileParser, mock_get_robots_from_url): + test_url = "http://test.com" + test_user_agent = "test_agent" + + # Prepare mocks + mock_robotsTxt = MagicMock() + mock_robotParser = MagicMock() + + # Mock return values + mock_get_robots_from_url.return_value = mock_robotsTxt + mock_robotFileParser.return_value = mock_robotParser + + mock_robotsTxt.splitlines.return_value = "User-agent: *\nDisallow: /" + mock_robotParser.can_fetch.return_value = True + + # Call function + result = find_posts.can_fetch(test_user_agent, test_url) + + # Check calls and results + mock_get_robots_from_url.assert_called_once_with( + "{uri.scheme}://{uri.netloc}/robots.txt".format(uri=parse.urlparse(test_url)) + ) + mock_robotParser.parse.assert_called_once_with(mock_robotsTxt.splitlines()) + mock_robotParser.can_fetch.assert_called_once_with(test_user_agent, test_url) + assert result is True + + # Testing when get_robots_from_url return bool type + mock_get_robots_from_url.return_value = True + result = find_posts.can_fetch(test_user_agent, test_url) + assert result is True + + +@pytest.fixture +def headers(): + return {"User-Agent": "test-agent"} + + +@pytest.fixture +def url(): + return "http://test.com" + + +@patch("find_posts.requests") +def test_robots_txt_prohibited(mock_requests, headers, url): + with patch("find_posts.can_fetch") as mock_can_fetch: + mock_can_fetch.return_value = False + with pytest.raises(Exception) as exc_info: + get(url, headers) + + assert "prohibited by robots.txt" in str(exc_info.value) + mock_can_fetch.assert_called_once_with(headers["User-Agent"], url) + + +@patch("find_posts.requests") +@patch("find_posts.can_fetch") +@patch("find_posts.user_agent") +@patch("find_posts.logger") +def test_post_success(mock_logger, mock_user_agent, mock_can_fetch, mock_requests): + url = "http://testurl.com" + mock_json = {"key": "value"} + headers = {"User-Agent": "test_agent"} + timeout = 2 + mock_user_agent.return_value = "test_agent" + mock_can_fetch.return_value = True + mock_requests.post.return_value.status_code = 200 + + post(url, mock_json, headers, timeout) + + mock_requests.post.assert_called_once_with( + url, json=mock_json, headers=headers, timeout=timeout + ) + + +@patch("find_posts.requests") +@patch("find_posts.can_fetch") +@patch("find_posts.user_agent") +@patch("find_posts.logger") +def test_post_rate_limit(mock_logger, mock_user_agent, mock_can_fetch, mock_requests): + url = "http://testurl.com" + mock_json = {"key": "value"} + headers = {"User-Agent": "test_agent"} + timeout = 2 + mock_user_agent.return_value = "test_agent" + mock_can_fetch.return_value = True + response = Mock() + response.status_code = 429 + response.headers = {"x-ratelimit-reset": "1900-01-01 01:00:00"} + mock_requests.post.return_value = response + + with pytest.raises(Exception): + post(url, mock_json, headers, timeout) + + +@patch("find_posts.requests") +@patch("find_posts.can_fetch") +@patch("find_posts.user_agent") +@patch("find_posts.logger") +def test_post_robotstxt_disallowed( + mock_logger, mock_user_agent, mock_can_fetch, mock_requests +): + url = "http://testurl.com" + mock_json = {"key": "value"} + headers = {} + mock_user_agent.return_value = "test_agent" + mock_can_fetch.return_value = False + + with pytest.raises(Exception): + post(url, mock_json, headers) + + +@patch("find_posts.get", autospec=True) +@patch("find_posts.ET.fromstring", autospec=True) +@patch("find_posts.logger", autospec=True) +def test_get_server_from_host_meta(mock_logger, mock_parse, mock_get): + server = "dummy-server" + result = "result" + + # Happy Path + mock_response = Mock() + mock_response.status_code = 200 + mock_response.text = "dummy text" + mock_get.return_value = mock_response + mock_parse.return_value.find.return_value.get.return_value = f"https://{result}/" + assert find_posts.get_server_from_host_meta(server) == result + mock_get.assert_called_once_with( + f"https://{server}/.well-known/host-meta", timeout=30 + ) + mock_parse.assert_called_once_with(mock_response.text) + mock_logger.error.assert_not_called() + + # Case when get(url) call throws an Exception + mock_get.side_effect = Exception("mocked exception") + assert find_posts.get_server_from_host_meta(server) is None + mock_logger.error.assert_called_once() + + # Case when status code is not 200 + mock_response.status_code = 404 + mock_get.side_effect = None + assert find_posts.get_server_from_host_meta(server) is None + mock_logger.error.assert_called() + + # Case when parsing fails + mock_response.status_code = 200 + mock_parse.side_effect = Exception("mocked exception") + assert find_posts.get_server_from_host_meta(server) is None + mock_logger.error.assert_called() + + # Case when matching fails + mock_parse.side_effect = None + mock_parse.return_value.find.return_value.get.return_value = "malformed url" + assert find_posts.get_server_from_host_meta(server) is None + mock_logger.error.assert_called() + + +@patch("find_posts.get", side_effect=Exception("Mock Exception")) +@patch("find_posts.logger") +def test_get_nodeinfo_get_exception(mock_logger, mock_get): + response = find_posts.get_nodeinfo("test_server", {}) + mock_logger.error.assert_called() + assert response is None + + +@patch("find_posts.get", return_value=Mock(status_code=404)) +@patch("find_posts.get_server_from_host_meta", return_value="new_server") +@patch("find_posts.logger") +def test_get_nodeinfo_404_status_no_fallback(mock_logger, mock_get_server, mock_get): + response = find_posts.get_nodeinfo("test_server", {}) + mock_logger.debug.assert_called() + assert response is None + + +@patch("find_posts.get", return_value=Mock(status_code=200)) +@patch("find_posts.logger") +def test_get_nodeinfo_200_status_no_links(mock_logger, mock_get): + mock_get.return_value.json.return_value = {"links": []} + response = find_posts.get_nodeinfo("test_server", {}) + mock_logger.error.assert_called() + assert response is None + + +@patch("find_posts.get", return_value=Mock(status_code=404)) +@patch("find_posts.logger") +def test_get_nodeinfo_404_status(mock_logger, mock_get): + mock_get.return_value.json.return_value = { + "links": [ + { + "rel": "http://nodeinfo.diaspora.software/ns/schema/2.0", + "href": "http://test.com", + } + ] + } + response = find_posts.get_nodeinfo("test_server", {}) + mock_logger.error.assert_called() + assert response is None + + +def test_set_server_apis(): + # mock server data + server = { + "software": "mastodon", + "rawnodeinfo": {"metadata": {"features": ["mastodon_api"]}}, + } + + # call the function to test + set_server_apis(server) + + # check APIs support + assert server["mastodonApiSupport"] == True + assert server["misskeyApiSupport"] == False + assert server["lemmyApiSupport"] == False + assert server["peertubeApiSupport"] == False + + # check if 'last_checked' is updated + assert isinstance(server["last_checked"], datetime) + + +def test_set_server_apis_without_metadata(): + # mock server data + server = {"software": "mastodon", "rawnodeinfo": {}} + + # call the function to test + set_server_apis(server) + + # check APIs support + assert server["mastodonApiSupport"] == True + assert server["misskeyApiSupport"] == False + assert server["lemmyApiSupport"] == False + assert server["peertubeApiSupport"] == False + + # check if 'last_checked' is updated + assert isinstance(server["last_checked"], datetime) + + +def test_set_server_apis_with_unknown_software(): + # mock server data + server = { + "software": "unknown", + "rawnodeinfo": {"metadata": {"features": ["unknown_feature"]}}, + } + + # call the function to test + set_server_apis(server) + + # check APIs support + assert server["mastodonApiSupport"] == False + assert server["misskeyApiSupport"] == False + assert server["lemmyApiSupport"] == False + assert server["peertubeApiSupport"] == False + + # check if 'last_checked' is updated + assert isinstance(server["last_checked"], datetime) + + +@patch("find_posts.get_paginated_mastodon") +def test_get_user_lists(mock_get_paginated_mastodon): + mock_get_paginated_mastodon.return_value = "Test value" + + server = "test-server" + token = "test-token" + expected_url = f"https://{server}/api/v1/lists" + expected_limit = 99 + expected_headers = {"Authorization": f"Bearer {token}"} + + result = find_posts.get_user_lists(server, token) + + mock_get_paginated_mastodon.assert_called_once_with( + expected_url, expected_limit, expected_headers + ) + + assert result == "Test value" + + +@patch("find_posts.get_paginated_mastodon") +@patch("find_posts.logger") +def test_get_list_timeline(mock_logger, mock_get_paginated_mastodon): + # Arrange + server = "mastodon.social" + list_info = {"id": 123, "title": "test_list"} + token = "token12345" + max = 100 + mock_get_paginated_mastodon.return_value = ["post1", "post2"] + + # Act + result = get_list_timeline(server, list_info, token, max) + + # Assert + mock_get_paginated_mastodon.assert_called_once_with( + f"https://{server}/api/v1/timelines/list/{list_info['id']}", + max, + { + "Authorization": f"Bearer {token}", + }, + ) + mock_logger.info.assert_called_once_with( + f"Found {len(mock_get_paginated_mastodon.return_value)} toots in list {list_info['title']}" + ) + assert len(result) == 2 + assert result == ["post1", "post2"] + + +@patch("find_posts.get_paginated_mastodon") +@patch("find_posts.logger") +def test_get_list_users(mock_logger, mock_get_paginated_mastodon): + # define mock values + mock_server = "mock_server" + mock_list = {"id": "mock_id", "title": "mock_title"} + mock_token = "mock_token" + mock_max = 5 + mock_accounts = ["account1", "account2", "account3"] + + # setup expected url + expected_url = f"https://{mock_server}/api/v1/lists/{mock_list['id']}/accounts" + + # Mock the return value of get_paginated_mastodon + mock_get_paginated_mastodon.return_value = mock_accounts + + # Call the function with the mock values + result = get_list_users(mock_server, mock_list, mock_token, mock_max) + + # Assert the function called get_paginated_mastodon with correct arguments + mock_get_paginated_mastodon.assert_called_once_with( + expected_url, mock_max, {"Authorization": f"Bearer {mock_token}"} + ) + + # Assert the function called logger.info with correct arguments + mock_logger.info.assert_called_once_with( + f"Found {len(mock_accounts)} accounts in list {mock_list['title']}" + ) + + # Assert the function returned correct result + assert result == mock_accounts + + +@patch("find_posts.get_all_known_context_urls") +@patch("find_posts.add_context_urls") +@patch("find_posts.add_user_posts") +@patch("find_posts.filter_known_users") +def test_fetch_timeline_context_with_empty_posts( + mock_filter_known_users, + mock_add_user_posts, + mock_add_context_urls, + mock_get_all_known_context_urls, +): + # Arrange + timeline_posts = [] + token, parsed_urls, seen_hosts = "", [], [] + seen_urls, all_known_users, recently_checked_users = [], [], [] + arguments = type("", (), {})() + arguments.server = "server_test" + arguments.backfill_mentioned_users = 0 + + # Act + find_posts.arguments = arguments + find_posts.fetch_timeline_context( + timeline_posts, + token, + parsed_urls, + seen_hosts, + seen_urls, + all_known_users, + recently_checked_users, + ) + + # Assert + mock_get_all_known_context_urls.assert_called_once_with( + arguments.server, timeline_posts, parsed_urls, seen_hosts + ) + mock_add_context_urls.assert_called_once_with( + arguments.server, token, mock_get_all_known_context_urls.return_value, seen_urls + ) + assert not mock_filter_known_users.called + assert not mock_add_user_posts.called