123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- import requests
- from requests.auth import HTTPBasicAuth
- from flask import Flask, request, jsonify
- app = Flask(__name__)
- # Config
- LOGIN_URL = "http://localhost:7777/user/login"
- EDIT_MILESTONE_URL = "http://localhost:7777/setyotontowi/experiments/milestones/1/edit"
- EVOLUZ_URL = "http://192.168.1.33:5050/apps/team/detail?"
- USERNAME = "setyotontowi"
- PASSWORD = "nasipadang"
- session = requests.Session() # Keep session across requests
- def login():
- """Log in and store session cookies."""
- print("Logging in...")
- payload = {
- "user_name": USERNAME,
- "password": PASSWORD
- }
- response = session.post(LOGIN_URL, data=payload)
-
- if response.status_code == 200:
- print("Login successful.")
- return True
- else:
- print(f"Login failed: {response.status_code} - {response.text}")
- return False
- def get_csrf_token():
- """Fetch CSRF token from the edit milestone page."""
- print("Fetching CSRF token...")
- response = session.get(EDIT_MILESTONE_URL)
- if response.status_code == 200:
- token = extract_csrf_token(response.text)
- if token:
- print(f"CSRF token obtained: {token}")
- return token
- else:
- print("Failed to extract CSRF token.")
- return None
- elif response.status_code == 401: # Unauthorized → Need to re-login
- print("Session expired, re-logging in...")
- if login():
- return get_csrf_token()
- else:
- print("Failed to log in while fetching CSRF token.")
- return None
- else:
- print(f"Failed to get CSRF token: {response.status_code} - {response.text}")
- return None
- def extract_csrf_token(html):
- """Extract CSRF token from HTML using regex."""
- import re
- match = re.search(r'name="_csrf" value="(.*?)"', html)
- if match:
- return match.group(1)
- return None
- def update_milestone(pr_data):
- login()
- """Update milestone with PR title using CSRF token."""
- csrf_token = get_csrf_token()
- if not csrf_token:
- print("Failed to get CSRF token.")
- return False
- content = create_milestone_content(pr_data)
- with open("content.md", "w") as file:
- file.write(content)
- payload = {
- "_csrf": csrf_token,
- "title":"Should change perhaps",
- "content": content,
- "deadline": "2025-03-31"
- }
- headers = {
- "Content-Type": "application/x-www-form-urlencoded"
- }
- response = session.post(EDIT_MILESTONE_URL, data=payload, headers=headers)
- if response.status_code == 200:
- print("Milestone updated successfully!")
- return True
- elif response.status_code == 401: # Unauthorized → Need to re-login
- print("Session expired, re-logging in...")
- if login():
- return update_milestone(pr_data)
- else:
- print(f"Failed to update milestone: {response.status_code} - {response.text}")
- return False
- def create_milestone_content(pr_data):
- content = get_milestone()
- if content:
- # From PR Data, get branch type, and username to assign it to identifier. like feature owi, then it should be
- # in feature_lambda
- type_task = pr_data.get("task_type")
- squad = "sigma"
- identifier = f'{type_task}_{squad}'
- content['content'] = append_item_to_identifier(content['content'], identifier, pr_data)
- return f"{content['content']}\n"
- else:
- return f"- [] {pr_data['title']}"
- def get_milestone():
- import re
- response = session.get(EDIT_MILESTONE_URL)
- if response.status_code == 200:
- title_match = re.search(r'<input[^>]*name=["\']title["\'][^>]*value=["\'](.*?)["\']', response.text, re.IGNORECASE)
- content_match = re.search(r'<textarea[^>]*name=["\']content["\'][^>]*>(.*?)</textarea>', response.text, re.IGNORECASE | re.DOTALL)
- title = title_match.group(1) if title_match else None
- content = content_match.group(1).strip() if content_match else None
- return {'title': title, 'content': content}
- else:
-
- return None
- def append_item_to_identifier(encoded_content, identifier, new_item):
- # Extract the block delimited by the identifier markers
- import re
- import html
- content = html.unescape(encoded_content)
- block_pattern = rf'(<!--\s*{identifier}\s*-->)(.*?)(<!--\s*/{identifier}\s*-->)'
- block_match = re.search(block_pattern, content, re.DOTALL)
- if not block_match:
- print(f"Block with identifier '{identifier}' not found.")
- return False
- # Extract the parts of the block
- start_marker = block_match.group(1)
- block_content = block_match.group(2).strip()
- end_marker = block_match.group(3)
- # Append the new item to the block content
- formatted_item = f"- [x] {new_item['username']}: [{new_item['title']}]({new_item['link_evoluz']})"
- print(formatted_item)
- updated_block_content = f"{block_content}\n{formatted_item}"
- # Reconstruct the content with the updated block
- updated_content = f"{start_marker}\n{updated_block_content}\n{end_marker}"
- content = content.replace(block_match.group(0), updated_content)
- print(f"New item appended to '{identifier}' block.")
- return content
- def find_username_id(username):
- """
- Finds the ID for a given username from the username_id file.
- Args:
- username (str): The username to look up.
- file_path (str): Path to the username_id file.
- Returns:
- str: The ID of the username, or None if not found.
- """
- try:
- with open("username_id", "r", encoding="utf-8") as file:
- for line in file:
- if line.strip(): # Skip empty lines
- user, user_id = line.strip().split(":")
- if user == username:
- return user_id
- print(f"Username '{username}' not found.")
- return None
- except FileNotFoundError:
- print(f"File not found: {file_path}")
- return None
- except Exception as e:
- print(f"An error occurred: {e}")
- return None
- def generate_evoluz_link(username_id, task_id) :
- team_id = f"teamID={username_id}"
- update_team_id = f"updateTeamId={task_id}"
- full_url = f"{EVOLUZ_URL}{team_id}&{update_team_id}"
- return full_url
- def find_task_type(branch_name) :
- """
- Extracts the branch type and task ID from the branch name.
- Args:
- branch_name (str): The branch name (e.g., "feature/1234_task_name").
- Returns:
- dict: A dictionary with 'branch_type' and 'task_id', or None if invalid format.
- """
- try:
- # Split the branch name by "/"
- parts = branch_name.split("/")
- if len(parts) >= 2:
- branch_type = parts[0] # The first part is the branch type
- task = parts[1]
- task_id = task.split("_")[0] # The second part is the task ID
- return {"task_type": branch_type, "task_id": task_id}
- else:
- print(f"Invalid branch name format: {branch_name}")
- return None
- except Exception as e:
- print(f"Error extracting task type: {e}")
- return None
- @app.route('/webhook', methods=['POST'])
- def handle_webhook():
- if request.method != 'POST':
- return 'Invalid request method', 405
-
- try:
- payload = request.get_json()
- if not payload:
- return 'Invalid JSON payload', 400
- # Handle Pull Request Event
- if 'pull_request' in payload:
- pr = payload.get('pull_request', {})
- pr_title = pr.get('title')
- pr_username = pr.get('user').get('username')
- pr_username_id = find_username_id(pr_username)
- pr_branch = pr.get('head_branch')
- pr_branch_data = find_task_type(pr_branch)
-
- pr_data = {
- 'title': pr_title,
- 'username': pr_username.capitalize(),
- 'branch': pr_branch,
- 'task_type': pr_branch_data.get("task_type"),
- 'user_id': find_username_id(pr_username),
- 'link_evoluz': generate_evoluz_link(pr_username_id, pr_branch_data.get("task_id"))
- }
- print(f"Received PR: {pr_title}")
- # Update milestone with PR title
- if update_milestone(pr_data):
- print("Milestone updated with PR title.")
- else:
- print("Failed to update milestone.")
- return jsonify({'status': 'success'}), 200
- except Exception as e:
- print(f"Error: {e}")
- return 'Internal server error', 500
- if __name__ == '__main__':
- if login(): # Attempt login at startup
- print("Initial login successful.")
- milestone = get_milestone()
- print(milestone)
- else:
- print("Initial login failed.")
- app.run(port=4001)
|