4 Commits

Author SHA1 Message Date
3d58068291 chore(base): Release v0.9.7
All checks were successful
Build and Push Docker Image on Tag / build_and_push (push) Successful in 45s
Create Release / release (push) Successful in 8s
2025-06-02 19:23:11 +02:00
e04c838ede refactor(app): Code structured and grouped 2025-06-02 17:35:55 +02:00
3e1255ccdc fix(main): Fixed error in hashtag display 2025-06-02 17:22:00 +02:00
3bb33ca379 fix(app): Add hashtags to bluesky post 2025-06-02 16:27:49 +02:00
2 changed files with 63 additions and 62 deletions

View File

@ -1,5 +1,5 @@
FROM python:3.11-slim FROM python:3.11-slim
LABEL version="0.9.6" LABEL version="0.9.7"
RUN apt-get update && apt-get install -y curl && apt-get clean && rm -rf /var/lib/apt/lists/* RUN apt-get update && apt-get install -y curl && apt-get clean && rm -rf /var/lib/apt/lists/*

View File

@ -5,6 +5,8 @@ import logging
import requests import requests
import threading import threading
import smtplib import smtplib
import re
import unicodedata
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from io import BytesIO from io import BytesIO
from mastodon import Mastodon from mastodon import Mastodon
@ -16,8 +18,10 @@ from email.mime.multipart import MIMEMultipart
from dateutil import parser as date_parser from dateutil import parser as date_parser
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
# Load environment variables
load_dotenv() load_dotenv()
# Configuration
FEED_URL = os.getenv("FEED_URL") FEED_URL = os.getenv("FEED_URL")
SEEN_POSTS_FILE = "/data/seen_posts.txt" SEEN_POSTS_FILE = "/data/seen_posts.txt"
MASTODON_BASE_URL = os.getenv("MASTODON_API_BASE_URL") MASTODON_BASE_URL = os.getenv("MASTODON_API_BASE_URL")
@ -27,6 +31,7 @@ BSKY_PASSWORD = os.getenv("BSKY_PASSWORD")
MAX_POST_AGE_DAYS = int(os.getenv("MAX_POST_AGE_DAYS", 0)) MAX_POST_AGE_DAYS = int(os.getenv("MAX_POST_AGE_DAYS", 0))
POST_TARGETS = os.getenv("POST_TARGETS", "both").lower() POST_TARGETS = os.getenv("POST_TARGETS", "both").lower()
# Logger setup
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
handler = logging.StreamHandler() handler = logging.StreamHandler()
@ -34,7 +39,7 @@ formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
# Healthcheck server
class HealthHandler(BaseHTTPRequestHandler): class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self): def do_GET(self):
if self.path == "/health": if self.path == "/health":
@ -48,19 +53,18 @@ class HealthHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args): def log_message(self, format, *args):
pass pass
def start_health_server(): def start_health_server():
server = HTTPServer(("0.0.0.0", 8000), HealthHandler) server = HTTPServer(("0.0.0.0", 8000), HealthHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True) thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start() thread.start()
logger.info(f"💡 Healthcheck server running on port 8000.") logger.info(f" Healthcheck server running on port 8000.")
# Email helper
def should_send_email(on_success: bool): def should_send_email(on_success: bool):
mode = os.getenv("EMAIL_MODE", "errors").lower() mode = os.getenv("EMAIL_MODE", "errors").lower()
return (mode == "all") or (mode == "errors" and not on_success) return (mode == "all") or (mode == "errors" and not on_success)
def generate_email_html(status: str, title: str, link: str, error_message: str = None) -> str: def generate_email_html(status: str, title: str, link: str, error_message: str = None) -> str:
color = "#2e7d32" if status == "success" else "#d32f2f" color = "#2e7d32" if status == "success" else "#d32f2f"
bg_color = "#f5f5f5" if status == "success" else "#fff3f3" bg_color = "#f5f5f5" if status == "success" else "#fff3f3"
@ -98,7 +102,6 @@ def generate_email_html(status: str, title: str, link: str, error_message: str =
</html> </html>
""" """
def send_status_email(subject, html_content): def send_status_email(subject, html_content):
try: try:
smtp_host = os.getenv("SMTP_HOST") smtp_host = os.getenv("SMTP_HOST")
@ -123,6 +126,33 @@ def send_status_email(subject, html_content):
except Exception as e: except Exception as e:
logger.error(f"❌ Error sending email: {e}") logger.error(f"❌ Error sending email: {e}")
# Utility functions
def extract_facets_utf8(text: str):
facets = []
def get_byte_range(char_start, char_end):
byte_start = len(text[:char_start].encode("utf-8"))
byte_end = len(text[:char_end].encode("utf-8"))
return byte_start, byte_end
for match in re.finditer(r"#(\w+)", text):
tag = match.group(1)
byte_start, byte_end = get_byte_range(*match.span())
facets.append({
"index": {"byteStart": byte_start, "byteEnd": byte_end},
"features": [{"$type": "app.bsky.richtext.facet#tag", "tag": tag}]
})
for match in re.finditer(r"https?://[^\s]+", text):
url = match.group(0)
byte_start, byte_end = get_byte_range(*match.span())
facets.append({
"index": {"byteStart": byte_start, "byteEnd": byte_end},
"features": [{"$type": "app.bsky.richtext.facet#link", "uri": url}]
})
return facets
def load_seen_ids(): def load_seen_ids():
os.makedirs(os.path.dirname(SEEN_POSTS_FILE), exist_ok=True) os.makedirs(os.path.dirname(SEEN_POSTS_FILE), exist_ok=True)
@ -131,12 +161,10 @@ def load_seen_ids():
with open(SEEN_POSTS_FILE, "r") as f: with open(SEEN_POSTS_FILE, "r") as f:
return set(line.strip() for line in f) return set(line.strip() for line in f)
def save_seen_id(post_id): def save_seen_id(post_id):
with open(SEEN_POSTS_FILE, "a") as f: with open(SEEN_POSTS_FILE, "a") as f:
f.write(post_id + "\n") f.write(post_id + "\n")
def post_to_mastodon(title, link, tags): def post_to_mastodon(title, link, tags):
mastodon = Mastodon(access_token=MASTODON_TOKEN, api_base_url=MASTODON_BASE_URL) mastodon = Mastodon(access_token=MASTODON_TOKEN, api_base_url=MASTODON_BASE_URL)
hashtags = " ".join(f"#{tag}" for tag in tags) if tags else "" hashtags = " ".join(f"#{tag}" for tag in tags) if tags else ""
@ -145,7 +173,6 @@ def post_to_mastodon(title, link, tags):
message += f"\n\n{hashtags}" message += f"\n\n{hashtags}"
mastodon.toot(message) mastodon.toot(message)
def fetch_og_data(url): def fetch_og_data(url):
try: try:
resp = requests.get(url, timeout=10) resp = requests.get(url, timeout=10)
@ -160,56 +187,45 @@ def fetch_og_data(url):
logger.error(f"❌ Error fetching OG data: {e}") logger.error(f"❌ Error fetching OG data: {e}")
return None, None return None, None
def post_to_bluesky(title, link, tags):
def post_to_bluesky(message, link):
client = Client() client = Client()
client.login(BSKY_HANDLE, BSKY_PASSWORD) client.login(BSKY_HANDLE, BSKY_PASSWORD)
title, image_url = fetch_og_data(link) hashtags = " ".join(f"#{tag}" for tag in tags) if tags else ""
text = title or message message = f"{title}\n\n{link}"
if hashtags:
message += f"\n\n{hashtags}"
if title and image_url: facets = extract_facets_utf8(message)
try:
try:
og_title, image_url = fetch_og_data(link)
if og_title and image_url:
embed = { embed = {
"$type": "app.bsky.embed.external", "$type": "app.bsky.embed.external",
"external": { "external": {
"uri": link, "uri": link,
"title": title, "title": title,
"description": "", "description": "",
"thumb": { "thumb": {"$type": "blob", "ref": None, "mimeType": "", "size": 0}
"$type": "blob",
"ref": None,
"mimeType": "",
"size": 0
}
} }
} }
img_resp = requests.get(image_url, timeout=10) img_resp = requests.get(image_url, timeout=10)
img_resp.raise_for_status() img_resp.raise_for_status()
blob = client.upload_blob(BytesIO(img_resp.content)) blob = client.upload_blob(BytesIO(img_resp.content))
embed["external"]["thumb"] = blob.blob embed["external"]["thumb"] = blob.blob
client.send_post(text=message, embed=embed, facets=facets)
client.send_post(text=text, embed=embed)
logger.info(f"✅ Posted to Bluesky with preview.") logger.info(f"✅ Posted to Bluesky with preview.")
return return
except Exception as e: except Exception as e:
logger.error(f"❌ Error uploading preview to Bluesky: {e}") logger.error(f"❌ Error uploading preview to Bluesky: {e}")
client.send_post(f"{text}\n{link}") client.send_post(text=message, facets=facets)
logger.info(f"💡 Posted to Bluesky without preview.") logger.info(f"💡 Posted to Bluesky without preview.")
def extract_post_date(entry): def extract_post_date(entry):
date_fields = [ date_fields = [entry.get(k) for k in ("published", "updated", "date_published", "date_modified", "pubDate")]
entry.get("published"),
entry.get("updated"),
entry.get("date_published"),
entry.get("date_modified"),
entry.get("pubDate")
]
dates = [] dates = []
for d in date_fields: for d in date_fields:
if d: if d:
try: try:
@ -219,10 +235,8 @@ def extract_post_date(entry):
dates.append(dt) dates.append(dt)
except Exception as e: except Exception as e:
logger.warning(f"⚠️ Could not parse date: {d} ({e})") logger.warning(f"⚠️ Could not parse date: {d} ({e})")
return min(dates) if dates else datetime.now(timezone.utc) return min(dates) if dates else datetime.now(timezone.utc)
def main(): def main():
seen_ids = load_seen_ids() seen_ids = load_seen_ids()
feed = feedparser.parse(FEED_URL) feed = feedparser.parse(FEED_URL)
@ -235,11 +249,6 @@ def main():
continue continue
post_date = extract_post_date(entry) post_date = extract_post_date(entry)
age = now - post_date
age_days = age.days
age_hours = age.seconds // 3600
#logger.info(f"Post '{entry.get('title', '').strip()}' is {age_days} days and {age_hours} hours old.")
if post_date < now - max_age: if post_date < now - max_age:
logger.info(f"⏩ Skipping old post ({MAX_POST_AGE_DAYS}+ days): {post_id}") logger.info(f"⏩ Skipping old post ({MAX_POST_AGE_DAYS}+ days): {post_id}")
continue continue
@ -247,15 +256,16 @@ def main():
title = entry.get("title", "").strip() title = entry.get("title", "").strip()
link = entry.get("link", "").strip() link = entry.get("link", "").strip()
def sanitize_tag(tag):
tag = tag.lower()
tag = unicodedata.normalize("NFKD", tag).encode("ascii", "ignore").decode("ascii")
tag = re.sub(r"\W+", "", tag)
return tag
tags = [] tags = []
if "tags" in entry: if "tags" in entry:
tags = [tag["term"] for tag in entry.tags if "term" in tag] raw_tags = [tag.get("term") if isinstance(tag, dict) else getattr(tag, "term", None) for tag in entry.tags]
tags = [sanitize_tag(t) for t in raw_tags if t]
if tags:
hashtags = " ".join(f"#{tag}" for tag in tags)
message = f"{link} {hashtags}"
else:
message = link
logger.info(f"💡 New post found: {title}") logger.info(f"💡 New post found: {title}")
@ -265,34 +275,25 @@ def main():
time.sleep(2) time.sleep(2)
if POST_TARGETS in ("bluesky", "both"): if POST_TARGETS in ("bluesky", "both"):
post_to_bluesky(f"{title}\n{link}", link) post_to_bluesky(title, link, tags)
save_seen_id(post_id) save_seen_id(post_id)
logger.info(f"✅ Post successfully published.") logger.info(f"✅ Post successfully published.")
if should_send_email(on_success=True): if should_send_email(on_success=True):
send_status_email( send_status_email(f"✅ Post published: {title}", generate_email_html("success", title, link))
f"✅ Post published: {title}",
generate_email_html("success", title, link)
)
except Exception as e: except Exception as e:
logger.error(f"❌ Posting failed: {e}") logger.error(f"❌ Posting failed: {e}")
if should_send_email(on_success=False): if should_send_email(on_success=False):
send_status_email( send_status_email(f"❌ Error posting: {title}", generate_email_html("error", title, link, str(e)))
f"❌ Error posting: {title}",
generate_email_html("error", title, link, str(e))
)
time.sleep(5) time.sleep(5)
if __name__ == "__main__": if __name__ == "__main__":
INTERVAL_MINUTES = int(os.getenv("INTERVAL_MINUTES", 30)) INTERVAL_MINUTES = int(os.getenv("INTERVAL_MINUTES", 30))
logger.info(f"🔁 Starting feed check every {INTERVAL_MINUTES} minutes.") logger.info(f"🔁 Starting feed check every {INTERVAL_MINUTES} minutes.")
start_health_server() start_health_server()
while True: while True:
try: try:
main() main()