Files
BlueMastoFeed/bluemastofeed.py

304 lines
9.8 KiB
Python

import os
import time
import feedparser
import logging
import requests
import threading
import smtplib
from bs4 import BeautifulSoup
from io import BytesIO
from mastodon import Mastodon
from atproto import Client
from dotenv import load_dotenv
from http.server import HTTPServer, BaseHTTPRequestHandler
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dateutil import parser as date_parser
from datetime import datetime, timezone, timedelta
load_dotenv()
FEED_URL = os.getenv("FEED_URL")
SEEN_POSTS_FILE = "/data/seen_posts.txt"
MASTODON_BASE_URL = os.getenv("MASTODON_API_BASE_URL")
MASTODON_TOKEN = os.getenv("MASTODON_ACCESS_TOKEN")
BSKY_HANDLE = os.getenv("BSKY_IDENTIFIER")
BSKY_PASSWORD = os.getenv("BSKY_PASSWORD")
MAX_POST_AGE_DAYS = int(os.getenv("MAX_POST_AGE_DAYS", 0))
POST_TARGETS = os.getenv("POST_TARGETS", "both").lower()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass
def start_health_server():
server = HTTPServer(("0.0.0.0", 8000), HealthHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
logger.info(f"💡 Healthcheck server running on port 8000.")
def should_send_email(on_success: bool):
mode = os.getenv("EMAIL_MODE", "errors").lower()
return (mode == "all") or (mode == "errors" and not on_success)
def generate_email_html(status: str, title: str, link: str, error_message: str = None) -> str:
color = "#2e7d32" if status == "success" else "#d32f2f"
bg_color = "#f5f5f5" if status == "success" else "#fff3f3"
border_color = "#ccc" if status == "success" else "#e57373"
emoji = "" if status == "success" else ""
heading = "Post Published" if status == "success" else "Error Posting Entry"
meta = "This is an automated success notification." if status == "success" else "Please check logs or configuration."
error_html = f"""
<p><strong>Error:</strong></p>
<div class=\"error\">{error_message}</div>
""" if error_message else ""
return f"""
<html>
<head>
<style>
body {{ font-family: 'Courier New', monospace; background-color: {bg_color}; color: #333; padding: 20px; }}
.container {{ background-color: #ffffff; border: 1px solid {border_color}; border-radius: 8px; padding: 20px; max-width: 600px; margin: auto; }}
h2 {{ color: {color}; }}
a {{ color: #1a73e8; text-decoration: none; }}
.error {{ font-family: monospace; background-color: #fce4ec; padding: 10px; border-radius: 4px; color: #b71c1c; }}
.meta {{ font-size: 14px; color: #777; }}
</style>
</head>
<body>
<div class=\"container\">
<h2>{emoji} {heading}</h2>
<p><strong>Title:</strong><br>{title}</p>
<p><strong>Link:</strong><br><a href=\"{link}\">{link}</a></p>
{error_html}
<p class=\"meta\">{meta}</p>
</div>
</body>
</html>
"""
def send_status_email(subject, html_content):
try:
smtp_host = os.getenv("SMTP_HOST")
smtp_port = int(os.getenv("SMTP_PORT", 587))
smtp_user = os.getenv("SMTP_USER")
smtp_password = os.getenv("SMTP_PASSWORD")
email_from = os.getenv("EMAIL_FROM")
email_to = os.getenv("EMAIL_TO")
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = email_from
msg["To"] = email_to
msg.attach(MIMEText(html_content, "html"))
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_password)
server.sendmail(email_from, email_to, msg.as_string())
logger.info(f"✅ Status email sent successfully.")
except Exception as e:
logger.error(f"❌ Error sending email: {e}")
def load_seen_ids():
os.makedirs(os.path.dirname(SEEN_POSTS_FILE), exist_ok=True)
if not os.path.exists(SEEN_POSTS_FILE):
open(SEEN_POSTS_FILE, "w").close()
with open(SEEN_POSTS_FILE, "r") as f:
return set(line.strip() for line in f)
def save_seen_id(post_id):
with open(SEEN_POSTS_FILE, "a") as f:
f.write(post_id + "\n")
def post_to_mastodon(title, link, tags):
mastodon = Mastodon(access_token=MASTODON_TOKEN, api_base_url=MASTODON_BASE_URL)
hashtags = " ".join(f"#{tag}" for tag in tags) if tags else ""
message = f"{title}\n\n{link}"
if hashtags:
message += f"\n\n{hashtags}"
mastodon.toot(message)
def fetch_og_data(url):
try:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
og_title = soup.find("meta", property="og:title")
og_image = soup.find("meta", property="og:image")
title = og_title["content"] if og_title and og_title.has_attr("content") else None
image_url = og_image["content"] if og_image and og_image.has_attr("content") else None
return title, image_url
except Exception as e:
logger.error(f"❌ Error fetching OG data: {e}")
return None, None
def post_to_bluesky(message, link):
client = Client()
client.login(BSKY_HANDLE, BSKY_PASSWORD)
title, image_url = fetch_og_data(link)
text = title or message
if title and image_url:
try:
embed = {
"$type": "app.bsky.embed.external",
"external": {
"uri": link,
"title": title,
"description": "",
"thumb": {
"$type": "blob",
"ref": None,
"mimeType": "",
"size": 0
}
}
}
img_resp = requests.get(image_url, timeout=10)
img_resp.raise_for_status()
blob = client.upload_blob(BytesIO(img_resp.content))
embed["external"]["thumb"] = blob.blob
client.send_post(text=text, embed=embed)
logger.info(f"✅ Posted to Bluesky with preview.")
return
except Exception as e:
logger.error(f"❌ Error uploading preview to Bluesky: {e}")
client.send_post(f"{text}\n{link}")
logger.info(f"💡 Posted to Bluesky without preview.")
def extract_post_date(entry):
date_fields = [
entry.get("published"),
entry.get("updated"),
entry.get("date_published"),
entry.get("date_modified"),
entry.get("pubDate")
]
dates = []
for d in date_fields:
if d:
try:
dt = date_parser.parse(d)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
dates.append(dt)
except Exception as e:
logger.warning(f"⚠️ Could not parse date: {d} ({e})")
return min(dates) if dates else datetime.now(timezone.utc)
def main():
seen_ids = load_seen_ids()
feed = feedparser.parse(FEED_URL)
now = datetime.now(timezone.utc)
max_age = timedelta(days=MAX_POST_AGE_DAYS)
for entry in feed.entries:
post_id = entry.get("id") or entry.get("link")
if post_id in seen_ids:
continue
post_date = extract_post_date(entry)
age = now - post_date
age_days = age.days
age_hours = age.seconds // 3600
#logger.info(f"Post '{entry.get('title', '').strip()}' is {age_days} days and {age_hours} hours old.")
if post_date < now - max_age:
logger.info(f"⏩ Skipping old post ({MAX_POST_AGE_DAYS}+ days): {post_id}")
continue
title = entry.get("title", "").strip()
link = entry.get("link", "").strip()
tags = []
if "tags" in entry:
tags = [tag["term"] for tag in entry.tags if "term" in tag]
if tags:
hashtags = " ".join(f"#{tag}" for tag in tags)
message = f"{link} {hashtags}"
else:
message = link
logger.info(f"💡 New post found: {title}")
try:
if POST_TARGETS in ("mastodon", "both"):
post_to_mastodon(title, link, tags)
time.sleep(2)
if POST_TARGETS in ("bluesky", "both"):
post_to_bluesky(f"{title}\n{link}", link)
save_seen_id(post_id)
logger.info(f"✅ Post successfully published.")
if should_send_email(on_success=True):
send_status_email(
f"✅ Post published: {title}",
generate_email_html("success", title, link)
)
except Exception as e:
logger.error(f"❌ Posting failed: {e}")
if should_send_email(on_success=False):
send_status_email(
f"❌ Error posting: {title}",
generate_email_html("error", title, link, str(e))
)
time.sleep(5)
if __name__ == "__main__":
INTERVAL_MINUTES = int(os.getenv("INTERVAL_MINUTES", 30))
logger.info(f"🔁 Starting feed check every {INTERVAL_MINUTES} minutes.")
start_health_server()
while True:
try:
main()
except Exception as e:
logger.error(f"Unhandled error during execution: {e}")
logger.info(f"⏳ Waiting {INTERVAL_MINUTES} minutes until next run...")
time.sleep(INTERVAL_MINUTES * 60)