import requests from requests.auth import HTTPBasicAuth from flask import Flask, request, jsonify app = Flask(__name__) # Config LOGIN_URL = "http://localhost:7777/user/login" EDIT_MILESTONE_URL = "http://localhost:7777/setyotontowi/experiments/milestones/1/edit" USERNAME = "setyotontowi" PASSWORD = "nasipadang" session = requests.Session() # Keep session across requests def login(): """Log in and store session cookies.""" print("Logging in...") payload = { "user_name": USERNAME, "password": PASSWORD } response = session.post(LOGIN_URL, data=payload) if response.status_code == 200: print("Login successful.") return True else: print(f"Login failed: {response.status_code} - {response.text}") return False def get_csrf_token(): """Fetch CSRF token from the edit milestone page.""" print("Fetching CSRF token...") response = session.get(EDIT_MILESTONE_URL) if response.status_code == 200: token = extract_csrf_token(response.text) if token: print(f"CSRF token obtained: {token}") return token else: print("Failed to extract CSRF token.") return None elif response.status_code == 401: # Unauthorized → Need to re-login print("Session expired, re-logging in...") if login(): return get_csrf_token() else: print("Failed to log in while fetching CSRF token.") return None else: print(f"Failed to get CSRF token: {response.status_code} - {response.text}") return None def extract_csrf_token(html): """Extract CSRF token from HTML using regex.""" import re match = re.search(r'name="_csrf" value="(.*?)"', html) if match: return match.group(1) return None def update_milestone(pr_data): login() """Update milestone with PR title using CSRF token.""" csrf_token = get_csrf_token() if not csrf_token: print("Failed to get CSRF token.") return False content = create_milestone_content(pr_data) with open("content.md", "w") as file: file.write(content) payload = { "_csrf": csrf_token, "title":"Should change perhaps", "content": content, "deadline": "2025-03-31" } headers = { "Content-Type": "application/x-www-form-urlencoded" } response = session.post(EDIT_MILESTONE_URL, data=payload, headers=headers) if response.status_code == 200: print("Milestone updated successfully!") return True elif response.status_code == 401: # Unauthorized → Need to re-login print("Session expired, re-logging in...") if login(): return update_milestone(pr_data) else: print(f"Failed to update milestone: {response.status_code} - {response.text}") return False def create_milestone_content(pr_data): content = get_milestone() if content: # From PR Data, get branch type, and username to assign it to identifier. like feature owi, then it should be # in feature_lambda content['content'] = append_item_to_identifier(content['content'], 'feature_sigma', pr_data) return f"{content['content']}\n" else: return f"- [] {pr_data['title']}" def get_milestone(): import re response = session.get(EDIT_MILESTONE_URL) if response.status_code == 200: title_match = re.search(r']*name=["\']title["\'][^>]*value=["\'](.*?)["\']', response.text, re.IGNORECASE) content_match = re.search(r']*name=["\']content["\'][^>]*>(.*?)', response.text, re.IGNORECASE | re.DOTALL) title = title_match.group(1) if title_match else None content = content_match.group(1).strip() if content_match else None return {'title': title, 'content': content} else: return None def append_item_to_identifier(encoded_content, identifier, new_item): # Extract the block delimited by the identifier markers import re import html content = html.unescape(encoded_content) block_pattern = rf'()(.*?)()' block_match = re.search(block_pattern, content, re.DOTALL) if not block_match: print(f"Block with identifier '{identifier}' not found.") return False # Extract the parts of the block start_marker = block_match.group(1) block_content = block_match.group(2).strip() end_marker = block_match.group(3) # Append the new item to the block content formatted_item = f"- [x] {new_item['username']}: [{new_item['title']}]({new_item['branch']})" updated_block_content = f"{block_content}\n{formatted_item}" # Reconstruct the content with the updated block updated_content = f"{start_marker}\n{updated_block_content}\n{end_marker}" content = content.replace(block_match.group(0), updated_content) print(f"New item appended to '{identifier}' block.") return content @app.route('/webhook', methods=['POST']) def handle_webhook(): if request.method != 'POST': return 'Invalid request method', 405 try: payload = request.get_json() if not payload: return 'Invalid JSON payload', 400 # Handle Pull Request Event if 'pull_request' in payload: pr = payload.get('pull_request', {}) pr_title = pr.get('title') pr_username = pr.get('user').get('username') pr_branch = pr.get('head_branch') pr_data = { 'title': pr_title, 'username': pr_username, 'branch': pr_branch } print(f"Received PR: {pr_title}") # Update milestone with PR title if update_milestone(pr_data): print("Milestone updated with PR title.") else: print("Failed to update milestone.") return jsonify({'status': 'success'}), 200 except Exception as e: print(f"Error: {e}") return 'Internal server error', 500 if __name__ == '__main__': if login(): # Attempt login at startup print("Initial login successful.") milestone = get_milestone() print(milestone) else: print("Initial login failed.") app.run(port=4001)