import os import time import feedparser import logging import requests import threading import smtplib import re import unicodedata from bs4 import BeautifulSoup from io import BytesIO from mastodon import Mastodon from atproto import Client from dotenv import load_dotenv from http.server import HTTPServer, BaseHTTPRequestHandler from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from dateutil import parser as date_parser from datetime import datetime, timezone, timedelta # Load environment variables load_dotenv() # Configuration FEED_URL = os.getenv("FEED_URL") SEEN_POSTS_FILE = "/data/seen_posts.txt" MASTODON_BASE_URL = os.getenv("MASTODON_API_BASE_URL") MASTODON_TOKEN = os.getenv("MASTODON_ACCESS_TOKEN") BSKY_HANDLE = os.getenv("BSKY_IDENTIFIER") BSKY_PASSWORD = os.getenv("BSKY_PASSWORD") MAX_POST_AGE_DAYS = int(os.getenv("MAX_POST_AGE_DAYS", 0)) POST_TARGETS = os.getenv("POST_TARGETS", "both").lower() # Logger setup logger = logging.getLogger() logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) # Healthcheck server class HealthHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == "/health": self.send_response(200) self.end_headers() self.wfile.write(b"OK") else: self.send_response(404) self.end_headers() def log_message(self, format, *args): pass def start_health_server(): server = HTTPServer(("0.0.0.0", 8000), HealthHandler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() logger.info(f"✨ Healthcheck server running on port 8000.") # Email helper def should_send_email(on_success: bool): mode = os.getenv("EMAIL_MODE", "errors").lower() return (mode == "all") or (mode == "errors" and not on_success) def generate_email_html(status: str, title: str, link: str, error_message: str = None) -> str: color = "#2e7d32" if status == "success" else "#d32f2f" bg_color = "#f5f5f5" if status == "success" else "#fff3f3" border_color = "#ccc" if status == "success" else "#e57373" emoji = "✅" if status == "success" else "❌" heading = "Post Published" if status == "success" else "Error Posting Entry" meta = "This is an automated success notification." if status == "success" else "Please check logs or configuration." error_html = f"""

Error:

{error_message}
""" if error_message else "" return f"""

{emoji} {heading}

Title:
{title}

Link:
{link}

{error_html}

{meta}

""" def send_status_email(subject, html_content): try: smtp_host = os.getenv("SMTP_HOST") smtp_port = int(os.getenv("SMTP_PORT", 587)) smtp_user = os.getenv("SMTP_USER") smtp_password = os.getenv("SMTP_PASSWORD") email_from = os.getenv("EMAIL_FROM") email_to = os.getenv("EMAIL_TO") msg = MIMEMultipart("alternative") msg["Subject"] = subject msg["From"] = email_from msg["To"] = email_to msg.attach(MIMEText(html_content, "html")) with smtplib.SMTP(smtp_host, smtp_port) as server: server.starttls() server.login(smtp_user, smtp_password) server.sendmail(email_from, email_to, msg.as_string()) logger.info(f"✅ Status email sent successfully.") except Exception as e: logger.error(f"❌ Error sending email: {e}") # Utility functions def extract_facets_utf8(text: str): facets = [] def get_byte_range(char_start, char_end): byte_start = len(text[:char_start].encode("utf-8")) byte_end = len(text[:char_end].encode("utf-8")) return byte_start, byte_end for match in re.finditer(r"#(\w+)", text): tag = match.group(1) byte_start, byte_end = get_byte_range(*match.span()) facets.append({ "index": {"byteStart": byte_start, "byteEnd": byte_end}, "features": [{"$type": "app.bsky.richtext.facet#tag", "tag": tag}] }) for match in re.finditer(r"https?://[^\s]+", text): url = match.group(0) byte_start, byte_end = get_byte_range(*match.span()) facets.append({ "index": {"byteStart": byte_start, "byteEnd": byte_end}, "features": [{"$type": "app.bsky.richtext.facet#link", "uri": url}] }) return facets def load_seen_ids(): os.makedirs(os.path.dirname(SEEN_POSTS_FILE), exist_ok=True) if not os.path.exists(SEEN_POSTS_FILE): open(SEEN_POSTS_FILE, "w").close() with open(SEEN_POSTS_FILE, "r") as f: return set(line.strip() for line in f) def save_seen_id(post_id): with open(SEEN_POSTS_FILE, "a") as f: f.write(post_id + "\n") def post_to_mastodon(title, link, tags): mastodon = Mastodon(access_token=MASTODON_TOKEN, api_base_url=MASTODON_BASE_URL) hashtags = " ".join(f"#{tag}" for tag in tags) if tags else "" message = f"{title}\n\n{link}" if hashtags: message += f"\n\n{hashtags}" mastodon.toot(message) def fetch_og_data(url): try: resp = requests.get(url, timeout=10) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") og_title = soup.find("meta", property="og:title") og_image = soup.find("meta", property="og:image") title = og_title["content"] if og_title and og_title.has_attr("content") else None image_url = og_image["content"] if og_image and og_image.has_attr("content") else None return title, image_url except Exception as e: logger.error(f"❌ Error fetching OG data: {e}") return None, None def post_to_bluesky(title, link, tags): client = Client() client.login(BSKY_HANDLE, BSKY_PASSWORD) hashtags = " ".join(f"#{tag}" for tag in tags) if tags else "" message = f"{title}\n\n{link}" if hashtags: message += f"\n\n{hashtags}" facets = extract_facets_utf8(message) try: og_title, image_url = fetch_og_data(link) if og_title and image_url: embed = { "$type": "app.bsky.embed.external", "external": { "uri": link, "title": title, "description": "", "thumb": {"$type": "blob", "ref": None, "mimeType": "", "size": 0} } } img_resp = requests.get(image_url, timeout=10) img_resp.raise_for_status() blob = client.upload_blob(BytesIO(img_resp.content)) embed["external"]["thumb"] = blob.blob client.send_post(text=message, embed=embed, facets=facets) logger.info(f"✅ Posted to Bluesky with preview.") return except Exception as e: logger.error(f"❌ Error uploading preview to Bluesky: {e}") client.send_post(text=message, facets=facets) logger.info(f"💡 Posted to Bluesky without preview.") def extract_post_date(entry): date_fields = [entry.get(k) for k in ("published", "updated", "date_published", "date_modified", "pubDate")] dates = [] for d in date_fields: if d: try: dt = date_parser.parse(d) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dates.append(dt) except Exception as e: logger.warning(f"⚠️ Could not parse date: {d} ({e})") return min(dates) if dates else datetime.now(timezone.utc) def main(): seen_ids = load_seen_ids() feed = feedparser.parse(FEED_URL) now = datetime.now(timezone.utc) max_age = timedelta(days=MAX_POST_AGE_DAYS) for entry in feed.entries: post_id = entry.get("id") or entry.get("link") if post_id in seen_ids: continue post_date = extract_post_date(entry) if post_date < now - max_age: logger.info(f"⏩ Skipping old post ({MAX_POST_AGE_DAYS}+ days): {post_id}") continue title = entry.get("title", "").strip() link = entry.get("link", "").strip() def sanitize_tag(tag): tag = tag.lower() tag = unicodedata.normalize("NFKD", tag).encode("ascii", "ignore").decode("ascii") tag = re.sub(r"\W+", "", tag) return tag tags = [] if "tags" in entry: raw_tags = [tag.get("term") if isinstance(tag, dict) else getattr(tag, "term", None) for tag in entry.tags] tags = [sanitize_tag(t) for t in raw_tags if t] logger.info(f"💡 New post found: {title}") try: if POST_TARGETS in ("mastodon", "both"): post_to_mastodon(title, link, tags) time.sleep(2) if POST_TARGETS in ("bluesky", "both"): post_to_bluesky(title, link, tags) save_seen_id(post_id) logger.info(f"✅ Post successfully published.") if should_send_email(on_success=True): send_status_email(f"✅ Post published: {title}", generate_email_html("success", title, link)) except Exception as e: logger.error(f"❌ Posting failed: {e}") if should_send_email(on_success=False): send_status_email(f"❌ Error posting: {title}", generate_email_html("error", title, link, str(e))) time.sleep(5) if __name__ == "__main__": INTERVAL_MINUTES = int(os.getenv("INTERVAL_MINUTES", 30)) logger.info(f"🔁 Starting feed check every {INTERVAL_MINUTES} minutes.") start_health_server() while True: try: main() except Exception as e: logger.error(f"Unhandled error during execution: {e}") logger.info(f"⏳ Waiting {INTERVAL_MINUTES} minutes until next run...") time.sleep(INTERVAL_MINUTES * 60)