WIP get and show timeline

For now the user is hardcoded in the url, don't want to need an
auth at this stage of the development.
This commit is contained in:
Óscar M. Lage 2024-11-14 18:11:12 +01:00
parent 15337cfe58
commit 38a68dbd3d
5 changed files with 75 additions and 2 deletions

View File

@ -15,8 +15,9 @@ Including another URLconf
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import path
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('', include('core.urls')),
]

View File

@ -0,0 +1,7 @@
{% for publication in publications %}
<div class="post">
<p><strong>{{ publication.author_name }} ({{ publication.author_handle }})</strong> <span>{{ publication.published_at }}</span></p>
<p>{{ publication.content }}</p>
<a href="{{ publication.uri }}" target="_blank">View post</a>
</div>
{% endfor %}

7
src/core/urls.py Normal file
View File

@ -0,0 +1,7 @@
from django.urls import path
from . import views
urlpatterns = [
path('timeline/<int:user_id>/', views.user_timeline, name='user_timeline'),
]

View File

@ -1,3 +1,60 @@
import requests
from django.shortcuts import render
from django.http import JsonResponse
from .models import PublicationCache, User
from django.conf import settings
from django.db import IntegrityError
# Create your views here.
def user_timeline(request, user_id):
try:
user = User.objects.get(id=user_id)
headers = {'Authorization': f"Bearer {user.access_token}"}
timeline_url = f"{user.pds_url}/xrpc/app.bsky.feed.getTimeline"
# Realizamos la solicitud al PDS
response = requests.get(timeline_url, headers=headers)
if response.status_code != 200:
return JsonResponse({"error": "Failed to retrieve timeline"}, status=500)
feed_data = response.json()
posts = feed_data.get('feed', [])
for post_data in posts:
post = post_data.get('post', {})
record = post.get('record', {})
author = post.get('author', {})
text = record.get('text', '')
created_at = record.get('createdAt', '')
post_id = post.get('uri', '')
if post_id:
publication, created = PublicationCache.objects.get_or_create(
id=post_id,
user=user,
defaults={
'content': text,
'author': author.get('handle', ''),
'author_name': author.get('displayName', ''),
'published_at': created_at,
'uri': post.get('uri', ''),
'cid': post.get('cid', ''),
}
)
if not created:
publication.content = text
publication.author = author.get('handle', '')
publication.author_name = author.get('displayName', '')
publication.published_at = created_at
publication.uri = post.get('uri', '')
publication.cid = post.get('cid', '')
publication.save()
publications = PublicationCache.objects.filter(user=user).order_by('-published_at')
return render(request, 'core/user_timeline.html', {'publications': publications})
except User.DoesNotExist:
return JsonResponse({"error": "User not found"}, status=404)
except IntegrityError as e:
return JsonResponse({"error": str(e)}, status=500)

View File

@ -1,3 +1,4 @@
Django==5.1.3
python-decouple==3.8
mysqlclient==2.2.6
requests==2.32.3