import os import time import feedparser import logging import requests import threading import smtplib import re import unicodedata from bs4 import BeautifulSoup from io import BytesIO from mastodon import Mastodon from atproto import Client from dotenv import load_dotenv from http.server import HTTPServer, BaseHTTPRequestHandler from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from dateutil import parser as date_parser from datetime import datetime, timezone, timedelta # Load environment variables load_dotenv() # Configuration FEED_URL = os.getenv("FEED_URL") SEEN_POSTS_FILE = "/data/seen_posts.txt" MASTODON_BASE_URL = os.getenv("MASTODON_API_BASE_URL") MASTODON_TOKEN = os.getenv("MASTODON_ACCESS_TOKEN") BSKY_HANDLE = os.getenv("BSKY_IDENTIFIER") BSKY_PASSWORD = os.getenv("BSKY_PASSWORD") MAX_POST_AGE_DAYS = int(os.getenv("MAX_POST_AGE_DAYS", 0)) POST_TARGETS = os.getenv("POST_TARGETS", "both").lower() # Logger setup logger = logging.getLogger() log_level = os.getenv("LOG_LEVEL", "INFO").upper() # Enable DEBUG level via env variable logger.setLevel(getattr(logging, log_level, logging.INFO)) handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) # Configuration overview (INFO level) logger.info(f"📡 Feed URL: {FEED_URL}") logger.info(f"📤 Posting targets: {POST_TARGETS}") logger.info(f"🕒 Max post age: {MAX_POST_AGE_DAYS} days") logger.info(f"📨 Email mode: {os.getenv('EMAIL_MODE', 'errors')}") logger.debug(f"🛠 Full environment variables: {dict(os.environ)}") # DEBUG: All environment variables # Healthcheck server handler class HealthHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == "/health": self.send_response(200) self.end_headers() self.wfile.write(b"OK") logger.debug("🏥 Healthcheck requested and responded OK.") # DEBUG Healthcheck log else: self.send_response(404) self.end_headers() def log_message(self, format, *args): # Suppress default HTTP request logging pass def start_health_server(): server = HTTPServer(("0.0.0.0", 8000), HealthHandler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() logger.info(f"✨ Healthcheck server running on port 8000.") # Email helper functions def should_send_email(on_success: bool): mode = os.getenv("EMAIL_MODE", "errors").lower() return (mode == "all") or (mode == "errors" and not on_success) def generate_email_html(status: str, title: str, link: str, error_message: str = None) -> str: color = "#2e7d32" if status == "success" else "#d32f2f" bg_color = "#f5f5f5" if status == "success" else "#fff3f3" border_color = "#ccc" if status == "success" else "#e57373" emoji = "✅" if status == "success" else "❌" heading = "Post Published" if status == "success" else "Error Posting Entry" meta = "This is an automated success notification." if status == "success" else "Please check logs or configuration." error_html = f"""

Error Details:

{error_message}
""" if error_message else "" return f"""

{emoji} {heading}

Title:
{title}

Link:
{link}

{error_html}

{meta}

""" def send_status_email(subject, html_content): try: smtp_host = os.getenv("SMTP_HOST") smtp_port = int(os.getenv("SMTP_PORT", 587)) smtp_user = os.getenv("SMTP_USER") smtp_password = os.getenv("SMTP_PASSWORD") email_from = os.getenv("EMAIL_FROM") email_to = os.getenv("EMAIL_TO") logger.debug(f"📧 Preparing to send email to {email_to} with subject: {subject}") # DEBUG msg = MIMEMultipart("alternative") msg["Subject"] = subject msg["From"] = email_from msg["To"] = email_to msg.attach(MIMEText(html_content, "html")) with smtplib.SMTP(smtp_host, smtp_port) as server: server.starttls() server.login(smtp_user, smtp_password) server.sendmail(email_from, email_to, msg.as_string()) logger.info(f"✅ Status email sent successfully.") except Exception: logger.exception(f"❌ Error sending email:") # Full stack trace on error # Utility functions def extract_facets_utf8(text: str): facets = [] def get_byte_range(char_start, char_end): byte_start = len(text[:char_start].encode("utf-8")) byte_end = len(text[:char_end].encode("utf-8")) return byte_start, byte_end # Extract hashtags for match in re.finditer(r"#(\w+)", text): tag = match.group(1) byte_start, byte_end = get_byte_range(*match.span()) facets.append({ "index": {"byteStart": byte_start, "byteEnd": byte_end}, "features": [{"$type": "app.bsky.richtext.facet#tag", "tag": tag}] }) # Extract links for match in re.finditer(r"https?://[^\s]+", text): url = match.group(0) byte_start, byte_end = get_byte_range(*match.span()) facets.append({ "index": {"byteStart": byte_start, "byteEnd": byte_end}, "features": [{"$type": "app.bsky.richtext.facet#link", "uri": url}] }) logger.debug(f"🏷 Extracted facets: {facets}") # DEBUG return facets def load_seen_ids(): os.makedirs(os.path.dirname(SEEN_POSTS_FILE), exist_ok=True) if not os.path.exists(SEEN_POSTS_FILE): open(SEEN_POSTS_FILE, "w").close() with open(SEEN_POSTS_FILE, "r") as f: seen = set(line.strip() for line in f) logger.debug(f"🗂 Loaded {len(seen)} seen post IDs.") # DEBUG return seen def save_seen_id(post_id): with open(SEEN_POSTS_FILE, "a") as f: f.write(post_id + "\n") logger.debug(f"📝 Saved post ID as seen: {post_id}") # DEBUG def post_to_mastodon(title, link, tags): mastodon = Mastodon(access_token=MASTODON_TOKEN, api_base_url=MASTODON_BASE_URL) hashtags = " ".join(f"#{tag}" for tag in tags) if tags else "" message = f"{title}\n\n{link}" if hashtags: message += f"\n\n{hashtags}" logger.debug(f"🐘 Posting to Mastodon: {message}") # DEBUG mastodon.toot(message) logger.info("✅ Posted to Mastodon.") def fetch_og_data(url): try: logger.debug(f"🔍 Fetching OpenGraph data from {url}") # DEBUG resp = requests.get(url, timeout=10) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") og_title = soup.find("meta", property="og:title") og_image = soup.find("meta", property="og:image") title = og_title["content"] if og_title and og_title.has_attr("content") else None image_url = og_image["content"] if og_image and og_image.has_attr("content") else None if not title or not image_url: logger.debug(f"⚠️ OpenGraph data incomplete. Title: {title}, Image: {image_url}") # DEBUG return title, image_url except Exception: logger.exception(f"❌ Error fetching OpenGraph data:") return None, None def post_to_bluesky(title, link, tags): client = Client() logger.debug(f"🔑 Logging in to Bluesky as {BSKY_HANDLE}") # DEBUG client.login(BSKY_HANDLE, BSKY_PASSWORD) hashtags = " ".join(f"#{tag}" for tag in tags) if tags else "" message = f"{title}\n\n{link}" if hashtags: message += f"\n\n{hashtags}" facets = extract_facets_utf8(message) try: og_title, image_url = fetch_og_data(link) if og_title and image_url: embed = { "$type": "app.bsky.embed.external", "external": { "uri": link, "title": title, "description": "", "thumb": {"$type": "blob", "ref": None, "mimeType": "", "size": 0} } } logger.debug(f"📸 Attempting to upload preview image from: {image_url}") # DEBUG img_resp = requests.get(image_url, timeout=10) img_resp.raise_for_status() blob = client.upload_blob(BytesIO(img_resp.content)) embed["external"]["thumb"] = blob.blob client.send_post(text=message, embed=embed, facets=facets) logger.info(f"✅ Posted to Bluesky with preview.") return except Exception: logger.exception(f"❌ Error uploading preview to Bluesky:") client.send_post(text=message, facets=facets) logger.info(f"💡 Posted to Bluesky without preview.") def extract_post_date(entry): date_fields = [entry.get(k) for k in ("published", "updated", "date_published", "date_modified", "pubDate")] dates = [] for d in date_fields: if d: try: dt = date_parser.parse(d) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dates.append(dt) except Exception as e: logger.warning(f"⚠️ Could not parse date: {d} ({e})") chosen_date = min(dates) if dates else datetime.now(timezone.utc) logger.debug(f"📅 Extracted post date: {chosen_date}") # DEBUG return chosen_date def main(): seen_ids = load_seen_ids() feed = feedparser.parse(FEED_URL) now = datetime.now(timezone.utc) max_age = timedelta(days=MAX_POST_AGE_DAYS) logger.debug(f"📰 Number of feed entries found: {len(feed.entries)}") # DEBUG for entry in feed.entries: post_id = entry.get("id") or entry.get("link") logger.debug(f"🆔 Checking post ID: {post_id}") # DEBUG if post_id in seen_ids: logger.debug(f"⏭️ Post already processed: {post_id}") # DEBUG continue post_date = extract_post_date(entry) if post_date < now - max_age: logger.info(f"⏩ Skipping old post (older than {MAX_POST_AGE_DAYS} days): {post_id}") continue title = entry.get("title", "").strip() link = entry.get("link", "").strip() def sanitize_tag(tag): tag = tag.lower() tag = unicodedata.normalize("NFKD", tag).encode("ascii", "ignore").decode("ascii") tag = re.sub(r"\W+", "", tag) return tag tags = [] if "tags" in entry: raw_tags = [tag.get("term") if isinstance(tag, dict) else getattr(tag, "term", None) for tag in entry.tags] tags = [sanitize_tag(t) for t in raw_tags if t] logger.debug(f"🏷 Extracted tags: {tags}") # DEBUG logger.info(f"💡 New post found: {title}") try: if POST_TARGETS in ("mastodon", "both"): post_to_mastodon(title, link, tags) time.sleep(2) if POST_TARGETS in ("bluesky", "both"): post_to_bluesky(title, link, tags) save_seen_id(post_id) logger.info(f"✅ Post successfully published.") if should_send_email(on_success=True): send_status_email(f"✅ Post published: {title}", generate_email_html("success", title, link)) except Exception: logger.exception(f"❌ Posting failed for post: {post_id}") if should_send_email(on_success=False): send_status_email(f"❌ Error posting: {title}", generate_email_html("error", title, link, str(Exception))) time.sleep(5) if __name__ == "__main__": INTERVAL_MINUTES = int(os.getenv("INTERVAL_MINUTES", 30)) logger.info(f"🔁 Starting feed check every {INTERVAL_MINUTES} minutes.") start_health_server() while True: try: main() except Exception: logger.exception("Unhandled error during execution:") logger.info(f"⏳ Waiting {INTERVAL_MINUTES} minutes until next run...") time.sleep(INTERVAL_MINUTES * 60)