123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- import requests
- from requests.auth import HTTPBasicAuth
- from flask import Flask, request, jsonify
- app = Flask(__name__)
- # Config
- LOGIN_URL = "http://localhost:7777/user/login"
- EDIT_MILESTONE_URL = "http://localhost:7777/setyotontowi/experiments/milestones/1/edit"
- USERNAME = "setyotontowi"
- PASSWORD = "nasipadang"
- session = requests.Session() # Keep session across requests
- def login():
- """Log in and store session cookies."""
- print("Logging in...")
- payload = {
- "user_name": USERNAME,
- "password": PASSWORD
- }
- response = session.post(LOGIN_URL, data=payload)
-
- if response.status_code == 200:
- print("Login successful.")
- return True
- else:
- print(f"Login failed: {response.status_code} - {response.text}")
- return False
- def get_csrf_token():
- """Fetch CSRF token from the edit milestone page."""
- print("Fetching CSRF token...")
- response = session.get(EDIT_MILESTONE_URL)
- if response.status_code == 200:
- token = extract_csrf_token(response.text)
- if token:
- print(f"CSRF token obtained: {token}")
- return token
- else:
- print("Failed to extract CSRF token.")
- return None
- elif response.status_code == 401: # Unauthorized → Need to re-login
- print("Session expired, re-logging in...")
- if login():
- return get_csrf_token()
- else:
- print("Failed to log in while fetching CSRF token.")
- return None
- else:
- print(f"Failed to get CSRF token: {response.status_code} - {response.text}")
- return None
- def extract_csrf_token(html):
- """Extract CSRF token from HTML using regex."""
- import re
- match = re.search(r'name="_csrf" value="(.*?)"', html)
- if match:
- return match.group(1)
- return None
- def update_milestone(title):
- """Update milestone with PR title using CSRF token."""
- csrf_token = get_csrf_token()
- if not csrf_token:
- print("Failed to get CSRF token.")
- return False
- payload = {
- "_csrf": csrf_token,
- "title": title,
- "content": "PR updated via webhook",
- "deadline": "2025-03-31"
- }
- headers = {
- "Content-Type": "application/x-www-form-urlencoded"
- }
- print(f"Sending POST request to update milestone with title: '{title}'...")
- response = session.post(EDIT_MILESTONE_URL, data=payload, headers=headers)
- if response.status_code == 200:
- print("Milestone updated successfully!")
- return True
- elif response.status_code == 401: # Unauthorized → Need to re-login
- print("Session expired, re-logging in...")
- if login():
- return update_milestone(title)
- else:
- print(f"Failed to update milestone: {response.status_code} - {response.text}")
- return False
- @app.route('/webhook', methods=['POST'])
- def handle_webhook():
- if request.method != 'POST':
- return 'Invalid request method', 405
-
- try:
- payload = request.get_json()
- if not payload:
- return 'Invalid JSON payload', 400
- # Handle Pull Request Event
- if 'pull_request' in payload:
- pr = payload.get('pull_request', {})
- pr_title = pr.get('title')
- print(f"Received PR: {pr_title}")
- # Update milestone with PR title
- if update_milestone(pr_title):
- print("Milestone updated with PR title.")
- else:
- print("Failed to update milestone.")
- return jsonify({'status': 'success'}), 200
- except Exception as e:
- print(f"Error: {e}")
- return 'Internal server error', 500
- if __name__ == '__main__':
- if login(): # Attempt login at startup
- print("Initial login successful.")
- else:
- print("Initial login failed.")
- app.run(port=4001)
|