milestone.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import re
  2. import html
  3. from utils import find_squad
  4. from auth import login, get_csrf_token, EDIT_MILESTONE_URL, session
  5. def update_milestone(pr_data):
  6. login()
  7. """Update milestone with PR title using CSRF token."""
  8. csrf_token = get_csrf_token()
  9. if not csrf_token:
  10. print("Failed to get CSRF token.")
  11. return False
  12. meta = get_milestone(session)
  13. content = create_milestone_content(meta.get("content"), pr_data)
  14. print(f"Updating milestone with content:\n")
  15. with open("content.md", "w") as file:
  16. file.write(content)
  17. payload = {
  18. "_csrf": csrf_token,
  19. "title": meta.get("title"),
  20. "content": content,
  21. "deadline": meta.get("deadline")
  22. }
  23. headers = {
  24. "Content-Type": "application/x-www-form-urlencoded"
  25. }
  26. response = session.post(EDIT_MILESTONE_URL, data=payload, headers=headers)
  27. if response.status_code == 200:
  28. print("Milestone updated successfully!")
  29. return True
  30. elif response.status_code == 401: # Unauthorized → Need to re-login
  31. print("Session expired, re-logging in...")
  32. if login():
  33. return update_milestone(pr_data)
  34. else:
  35. print(f"Failed to update milestone: {response.status_code} - {response.text}")
  36. return False
  37. def create_milestone_content(content, pr_data):
  38. if content:
  39. # From PR Data, get branch type, and username to assign it to identifier. like feature owi, then it should be
  40. # in feature_lambda
  41. type_task = pr_data.get("task_type")
  42. squad = "sigma"
  43. identifier = f'{type_task}_{squad}'
  44. content = append_item_to_identifier(content, identifier, pr_data)
  45. return f"{content}\n"
  46. else:
  47. return f"- [] {pr_data['title']}"
  48. def get_milestone(session):
  49. """Fetch milestone details."""
  50. response = session.get(EDIT_MILESTONE_URL)
  51. if response.status_code == 200:
  52. title_match = re.search(r'<input[^>]*name=["\']title["\'][^>]*value=["\'](.* ?)["\']', response.text, re.IGNORECASE)
  53. content_match = re.search(r'<textarea[^>]*name=["\']content["\'][^>]*>(.*?)</textarea>', response.text, re.IGNORECASE | re.DOTALL)
  54. deadline_match = re.search(r'<input[^>]*name=["\']deadline["\'][^>]*value=["\'](.*?)["\']', response.text, re.IGNORECASE)
  55. title = title_match.group(1) if title_match else None
  56. content = content_match.group(1).strip() if content_match else None
  57. deadline = deadline_match.group(1) if deadline_match else None
  58. return {'title': title, 'content': content, 'deadline': deadline}
  59. else:
  60. print(f"Failed to fetch milestone: {response.status_code}")
  61. return None
  62. def append_item_to_identifier(encoded_content, identifier, new_item):
  63. # Extract the block delimited by the identifier markers
  64. content = html.unescape(encoded_content)
  65. block_pattern = rf'(<!--\s*{identifier}\s*-->)(.*?)(<!--\s*/{identifier}\s*-->)'
  66. block_match = re.search(block_pattern, content, re.DOTALL)
  67. if not block_match:
  68. print(f"Block with identifier '{identifier}' not found.")
  69. return False
  70. # Extract the parts of the block
  71. start_marker = block_match.group(1)
  72. block_content = block_match.group(2).strip()
  73. end_marker = block_match.group(3)
  74. # Append the new item to the block content
  75. formatted_item = f"- [x] {new_item['username']}: [{new_item['title']}]({new_item['link_evoluz']})"
  76. updated_block_content = f"{block_content}\n{formatted_item}"
  77. # Reconstruct the content with the updated block
  78. updated_content = f"{start_marker}\n{updated_block_content}\n{end_marker}"
  79. content = content.replace(block_match.group(0), updated_content)
  80. print(f"New item appended to '{identifier}' block.")
  81. return content