import re import html from utils import find_squad from auth import login, get_csrf_token, EDIT_MILESTONE_URL, session def update_milestone(pr_data): login() """Update milestone with PR title using CSRF token.""" csrf_token = get_csrf_token() if not csrf_token: print("Failed to get CSRF token.") return False meta = get_milestone(session) content = create_milestone_content(meta.get("content"), pr_data) print(f"Updating milestone with content:\n") with open("content.md", "w") as file: file.write(content) payload = { "_csrf": csrf_token, "title": meta.get("title"), "content": content, "deadline": meta.get("deadline") } headers = { "Content-Type": "application/x-www-form-urlencoded" } response = session.post(EDIT_MILESTONE_URL, data=payload, headers=headers) if response.status_code == 200: print("Milestone updated successfully!") return True elif response.status_code == 401: # Unauthorized → Need to re-login print("Session expired, re-logging in...") if login(): return update_milestone(pr_data) else: print(f"Failed to update milestone: {response.status_code} - {response.text}") return False def create_milestone_content(content, pr_data): if content: # From PR Data, get branch type, and username to assign it to identifier. like feature owi, then it should be # in feature_lambda type_task = pr_data.get("task_type") squad = "sigma" identifier = f'{type_task}_{squad}' content = append_item_to_identifier(content, identifier, pr_data) return f"{content}\n" else: return f"- [] {pr_data['title']}" def get_milestone(session): """Fetch milestone details.""" response = session.get(EDIT_MILESTONE_URL) if response.status_code == 200: title_match = re.search(r']*name=["\']title["\'][^>]*value=["\'](.* ?)["\']', response.text, re.IGNORECASE) content_match = re.search(r']*name=["\']content["\'][^>]*>(.*?)', response.text, re.IGNORECASE | re.DOTALL) deadline_match = re.search(r']*name=["\']deadline["\'][^>]*value=["\'](.*?)["\']', response.text, re.IGNORECASE) title = title_match.group(1) if title_match else None content = content_match.group(1).strip() if content_match else None deadline = deadline_match.group(1) if deadline_match else None return {'title': title, 'content': content, 'deadline': deadline} else: print(f"Failed to fetch milestone: {response.status_code}") return None def append_item_to_identifier(encoded_content, identifier, new_item): # Extract the block delimited by the identifier markers content = html.unescape(encoded_content) block_pattern = rf'()(.*?)()' block_match = re.search(block_pattern, content, re.DOTALL) if not block_match: print(f"Block with identifier '{identifier}' not found.") return False # Extract the parts of the block start_marker = block_match.group(1) block_content = block_match.group(2).strip() end_marker = block_match.group(3) # Append the new item to the block content formatted_item = f"- [x] {new_item['username']}: [{new_item['title']}]({new_item['link_evoluz']})" updated_block_content = f"{block_content}\n{formatted_item}" # Reconstruct the content with the updated block updated_content = f"{start_marker}\n{updated_block_content}\n{end_marker}" content = content.replace(block_match.group(0), updated_content) print(f"New item appended to '{identifier}' block.") return content