import requests
from requests.auth import HTTPBasicAuth
from flask import Flask, request, jsonify
app = Flask(__name__)
# Config
LOGIN_URL = "http://localhost:7777/user/login"
EDIT_MILESTONE_URL = "http://localhost:7777/setyotontowi/experiments/milestones/1/edit"
EVOLUZ_URL = "http://192.168.1.33:5050/apps/team/detail?"
USERNAME = "setyotontowi"
PASSWORD = "nasipadang"
session = requests.Session() # Keep session across requests
def login():
"""Log in and store session cookies."""
print("Logging in...")
payload = {
"user_name": USERNAME,
"password": PASSWORD
}
response = session.post(LOGIN_URL, data=payload)
if response.status_code == 200:
print("Login successful.")
return True
else:
print(f"Login failed: {response.status_code} - {response.text}")
return False
def get_csrf_token():
"""Fetch CSRF token from the edit milestone page."""
print("Fetching CSRF token...")
response = session.get(EDIT_MILESTONE_URL)
if response.status_code == 200:
token = extract_csrf_token(response.text)
if token:
print(f"CSRF token obtained: {token}")
return token
else:
print("Failed to extract CSRF token.")
return None
elif response.status_code == 401: # Unauthorized → Need to re-login
print("Session expired, re-logging in...")
if login():
return get_csrf_token()
else:
print("Failed to log in while fetching CSRF token.")
return None
else:
print(f"Failed to get CSRF token: {response.status_code} - {response.text}")
return None
def extract_csrf_token(html):
"""Extract CSRF token from HTML using regex."""
import re
match = re.search(r'name="_csrf" value="(.*?)"', html)
if match:
return match.group(1)
return None
def update_milestone(pr_data):
login()
"""Update milestone with PR title using CSRF token."""
csrf_token = get_csrf_token()
if not csrf_token:
print("Failed to get CSRF token.")
return False
content = create_milestone_content(pr_data)
with open("content.md", "w") as file:
file.write(content)
payload = {
"_csrf": csrf_token,
"title":"Should change perhaps",
"content": content,
"deadline": "2025-03-31"
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
response = session.post(EDIT_MILESTONE_URL, data=payload, headers=headers)
if response.status_code == 200:
print("Milestone updated successfully!")
return True
elif response.status_code == 401: # Unauthorized → Need to re-login
print("Session expired, re-logging in...")
if login():
return update_milestone(pr_data)
else:
print(f"Failed to update milestone: {response.status_code} - {response.text}")
return False
def create_milestone_content(pr_data):
content = get_milestone()
if content:
# From PR Data, get branch type, and username to assign it to identifier. like feature owi, then it should be
# in feature_lambda
type_task = pr_data.get("task_type")
squad = "sigma"
identifier = f'{type_task}_{squad}'
content['content'] = append_item_to_identifier(content['content'], identifier, pr_data)
return f"{content['content']}\n"
else:
return f"- [] {pr_data['title']}"
def get_milestone():
import re
response = session.get(EDIT_MILESTONE_URL)
if response.status_code == 200:
title_match = re.search(r']*name=["\']title["\'][^>]*value=["\'](.*?)["\']', response.text, re.IGNORECASE)
content_match = re.search(r'', response.text, re.IGNORECASE | re.DOTALL)
title = title_match.group(1) if title_match else None
content = content_match.group(1).strip() if content_match else None
return {'title': title, 'content': content}
else:
return None
def append_item_to_identifier(encoded_content, identifier, new_item):
# Extract the block delimited by the identifier markers
import re
import html
content = html.unescape(encoded_content)
block_pattern = rf'()(.*?)()'
block_match = re.search(block_pattern, content, re.DOTALL)
if not block_match:
print(f"Block with identifier '{identifier}' not found.")
return False
# Extract the parts of the block
start_marker = block_match.group(1)
block_content = block_match.group(2).strip()
end_marker = block_match.group(3)
# Append the new item to the block content
formatted_item = f"- [x] {new_item['username']}: [{new_item['title']}]({new_item['link_evoluz']})"
print(formatted_item)
updated_block_content = f"{block_content}\n{formatted_item}"
# Reconstruct the content with the updated block
updated_content = f"{start_marker}\n{updated_block_content}\n{end_marker}"
content = content.replace(block_match.group(0), updated_content)
print(f"New item appended to '{identifier}' block.")
return content
def find_username_id(username):
"""
Finds the ID for a given username from the username_id file.
Args:
username (str): The username to look up.
file_path (str): Path to the username_id file.
Returns:
str: The ID of the username, or None if not found.
"""
try:
with open("username_id", "r", encoding="utf-8") as file:
for line in file:
if line.strip(): # Skip empty lines
user, user_id = line.strip().split(":")
if user == username:
return user_id
print(f"Username '{username}' not found.")
return None
except FileNotFoundError:
print(f"File not found: {file_path}")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
def generate_evoluz_link(username_id, task_id) :
team_id = f"teamID={username_id}"
update_team_id = f"updateTeamId={task_id}"
full_url = f"{EVOLUZ_URL}{team_id}&{update_team_id}"
return full_url
def find_task_type(branch_name) :
"""
Extracts the branch type and task ID from the branch name.
Args:
branch_name (str): The branch name (e.g., "feature/1234_task_name").
Returns:
dict: A dictionary with 'branch_type' and 'task_id', or None if invalid format.
"""
try:
# Split the branch name by "/"
parts = branch_name.split("/")
if len(parts) >= 2:
branch_type = parts[0] # The first part is the branch type
task = parts[1]
task_id = task.split("_")[0] # The second part is the task ID
return {"task_type": branch_type, "task_id": task_id}
else:
print(f"Invalid branch name format: {branch_name}")
return None
except Exception as e:
print(f"Error extracting task type: {e}")
return None
@app.route('/webhook', methods=['POST'])
def handle_webhook():
if request.method != 'POST':
return 'Invalid request method', 405
try:
payload = request.get_json()
if not payload:
return 'Invalid JSON payload', 400
# Handle Pull Request Event
if 'pull_request' in payload:
pr = payload.get('pull_request', {})
pr_title = pr.get('title')
pr_username = pr.get('user').get('username')
pr_username_id = find_username_id(pr_username)
pr_branch = pr.get('head_branch')
pr_branch_data = find_task_type(pr_branch)
pr_data = {
'title': pr_title,
'username': pr_username.capitalize(),
'branch': pr_branch,
'task_type': pr_branch_data.get("task_type"),
'user_id': find_username_id(pr_username),
'link_evoluz': generate_evoluz_link(pr_username_id, pr_branch_data.get("task_id"))
}
print(f"Received PR: {pr_title}")
# Update milestone with PR title
if update_milestone(pr_data):
print("Milestone updated with PR title.")
else:
print("Failed to update milestone.")
return jsonify({'status': 'success'}), 200
except Exception as e:
print(f"Error: {e}")
return 'Internal server error', 500
if __name__ == '__main__':
if login(): # Attempt login at startup
print("Initial login successful.")
milestone = get_milestone()
print(milestone)
else:
print("Initial login failed.")
app.run(port=4001)