监控hackone报告机器人
找了个机器人,分享一下脚本。使用方法就是
创建钉钉机器人,选择自定义 WEBHOOK 类型,可通过自定义关键词或 IP 地址进行安全设置。创建完成后,复制机器人的 accesstoken。
将源代码中的 accesstoken 替换为您自己的,使用 Python3.8.X 在后台运行。您可以直接输入
python3 h1_notifier.py
& 或在 tmux 中运行。后续可能会过期无法使用,但是无所谓,我到时候再改就行。
import feedparser
import requests
import re
from bs4 import BeautifulSoup
import schedule
import time
import random
from datetime import datetime
import logging
logging.basicConfig(filename='script_output.log',
level=logging.INFO,
format='%(asctime)s - %(message)s')
# CONFIG
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token="
last_sent_tweet_pubdate = None # Add a global variable
def get_link_description(url):
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
meta_title_element = soup.find('meta', {'name': 'description'})
if meta_title_element and 'content' in meta_title_element.attrs:
description = meta_title_element['content'].replace('#', '')
else:
description = 'No Description Found'
return description
except Exception as e:
logging.info(f"Error getting description for {url}: {e}")
return None
def send_to_dingtalk(content, title):
headers = {"Content-Type": "application/json"}
data = {
"msgtype": "markdown",
"markdown": {"title": title, "text": content},
}
requests.post(DINGTALK_WEBHOOK, headers=headers, json=data)
def get_new_published_tweets(feed, last_sent_tweet_pubdate):
new_published_tweets = []
for entry in feed.entries:
tweet_pubdate = datetime.strptime(entry.published.replace("GMT", "+0000"), '%a, %d %b %Y %H:%M:%S %z')
if last_sent_tweet_pubdate is None or tweet_pubdate > last_sent_tweet_pubdate:
new_published_tweets.append(entry)
else:
break
return new_published_tweets
def process_tweet(url_list):
global last_sent_tweet_pubdate
# random rss_url
random.shuffle(url_list)
for url in url_list:
feed = feedparser.parse(url)
if len(feed.entries) > 0:
new_published_tweets = get_new_published_tweets(feed, last_sent_tweet_pubdate)
if new_published_tweets:
for entry in new_published_tweets:
tweet_text = entry.title
url_link = re.search(r'https://hackerone\.com/reports/\d+', tweet_text)
if url_link:
# Convert the link in the tweet to MarkDown format to support mobile access.
tweet_text = tweet_text.replace(url_link.group(), f"[{url_link.group()}]({url_link.group()})")
logging.info(f"RSS URL used: {url}, Latest tweet: {tweet_text[:30]}, Time: {last_sent_tweet_pubdate}")
tweet_pubdate = datetime.strptime(entry.published.replace("GMT", "+0000"), '%a, %d %b %Y %H:%M:%S %z')
hackerone_url_match = re.search(r'https://hackerone\.com/reports/\d+', tweet_text)
hackerone_url = hackerone_url_match.group()
link_description = get_link_description(hackerone_url)
if link_description:
content = f"**Tweet:** \n\n{tweet_text}\n\n**Description:**\n\n {link_description}\n\n"
send_to_dingtalk(content, link_description)
last_sent_tweet_pubdate = max(last_sent_tweet_pubdate, tweet_pubdate) if last_sent_tweet_pubdate else tweet_pubdate
else:
logging.info(f"Tweet does not contain hackerone.com, skipping, Time: {last_sent_tweet_pubdate}")
else:
logging.info(f"Tweet has already been sent, skipping, Time: {last_sent_tweet_pubdate}")
break
else:
logging.info("Unable to get valid RSS data from all provided URLs.")
def main():
global last_sent_tweet_pubdate
TWITTER_USER = 'disclosedh1'
TWITTER_RSS_URLS = [
f"https://nitter.net/{TWITTER_USER}/rss",
#f"https://nitter.unixfox.eu/{TWITTER_USER}/rss",
f"https://nitter.snopyta.org/{TWITTER_USER}/rss",
f"https://twitter.owacon.moe/{TWITTER_USER}/rss",
]
logging.info("Checking for new tweets...")
process_tweet(TWITTER_RSS_URLS)
schedule.every(10).minutes.do(process_tweet, TWITTER_RSS_URLS)
while True:
schedule.run_pending()
time.sleep(5)
if __name__ == '__main__':
main()
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。后续可能会有评论区,不过也可以在github联系我。