gogmilestone2.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import requests
  2. from requests.auth import HTTPBasicAuth
  3. from flask import Flask, request, jsonify
  4. app = Flask(__name__)
  5. # Config
  6. LOGIN_URL = "http://localhost:7777/user/login"
  7. EDIT_MILESTONE_URL = "http://localhost:7777/setyotontowi/experiments/milestones/1/edit"
  8. EVOLUZ_URL = "http://192.168.1.33:5050/apps/team/detail?"
  9. USERNAME = "setyotontowi"
  10. PASSWORD = "nasipadang"
  11. session = requests.Session() # Keep session across requests
  12. def login():
  13. """Log in and store session cookies."""
  14. print("Logging in...")
  15. payload = {
  16. "user_name": USERNAME,
  17. "password": PASSWORD
  18. }
  19. response = session.post(LOGIN_URL, data=payload)
  20. if response.status_code == 200:
  21. print("Login successful.")
  22. return True
  23. else:
  24. print(f"Login failed: {response.status_code} - {response.text}")
  25. return False
  26. def get_csrf_token():
  27. """Fetch CSRF token from the edit milestone page."""
  28. print("Fetching CSRF token...")
  29. response = session.get(EDIT_MILESTONE_URL)
  30. if response.status_code == 200:
  31. token = extract_csrf_token(response.text)
  32. if token:
  33. print(f"CSRF token obtained: {token}")
  34. return token
  35. else:
  36. print("Failed to extract CSRF token.")
  37. return None
  38. elif response.status_code == 401: # Unauthorized → Need to re-login
  39. print("Session expired, re-logging in...")
  40. if login():
  41. return get_csrf_token()
  42. else:
  43. print("Failed to log in while fetching CSRF token.")
  44. return None
  45. else:
  46. print(f"Failed to get CSRF token: {response.status_code} - {response.text}")
  47. return None
  48. def extract_csrf_token(html):
  49. """Extract CSRF token from HTML using regex."""
  50. import re
  51. match = re.search(r'name="_csrf" value="(.*?)"', html)
  52. if match:
  53. return match.group(1)
  54. return None
  55. def update_milestone(pr_data):
  56. login()
  57. """Update milestone with PR title using CSRF token."""
  58. csrf_token = get_csrf_token()
  59. if not csrf_token:
  60. print("Failed to get CSRF token.")
  61. return False
  62. content = create_milestone_content(pr_data)
  63. with open("content.md", "w") as file:
  64. file.write(content)
  65. payload = {
  66. "_csrf": csrf_token,
  67. "title":"Should change perhaps",
  68. "content": content,
  69. "deadline": "2025-03-31"
  70. }
  71. headers = {
  72. "Content-Type": "application/x-www-form-urlencoded"
  73. }
  74. response = session.post(EDIT_MILESTONE_URL, data=payload, headers=headers)
  75. if response.status_code == 200:
  76. print("Milestone updated successfully!")
  77. return True
  78. elif response.status_code == 401: # Unauthorized → Need to re-login
  79. print("Session expired, re-logging in...")
  80. if login():
  81. return update_milestone(pr_data)
  82. else:
  83. print(f"Failed to update milestone: {response.status_code} - {response.text}")
  84. return False
  85. def create_milestone_content(pr_data):
  86. content = get_milestone()
  87. if content:
  88. # From PR Data, get branch type, and username to assign it to identifier. like feature owi, then it should be
  89. # in feature_lambda
  90. type_task = pr_data.get("task_type")
  91. squad = "sigma"
  92. identifier = f'{type_task}_{squad}'
  93. content['content'] = append_item_to_identifier(content['content'], identifier, pr_data)
  94. return f"{content['content']}\n"
  95. else:
  96. return f"- [] {pr_data['title']}"
  97. def get_milestone():
  98. import re
  99. response = session.get(EDIT_MILESTONE_URL)
  100. if response.status_code == 200:
  101. title_match = re.search(r'<input[^>]*name=["\']title["\'][^>]*value=["\'](.*?)["\']', response.text, re.IGNORECASE)
  102. content_match = re.search(r'<textarea[^>]*name=["\']content["\'][^>]*>(.*?)</textarea>', response.text, re.IGNORECASE | re.DOTALL)
  103. title = title_match.group(1) if title_match else None
  104. content = content_match.group(1).strip() if content_match else None
  105. return {'title': title, 'content': content}
  106. else:
  107. return None
  108. def append_item_to_identifier(encoded_content, identifier, new_item):
  109. # Extract the block delimited by the identifier markers
  110. import re
  111. import html
  112. content = html.unescape(encoded_content)
  113. block_pattern = rf'(<!--\s*{identifier}\s*-->)(.*?)(<!--\s*/{identifier}\s*-->)'
  114. block_match = re.search(block_pattern, content, re.DOTALL)
  115. if not block_match:
  116. print(f"Block with identifier '{identifier}' not found.")
  117. return False
  118. # Extract the parts of the block
  119. start_marker = block_match.group(1)
  120. block_content = block_match.group(2).strip()
  121. end_marker = block_match.group(3)
  122. # Append the new item to the block content
  123. formatted_item = f"- [x] {new_item['username']}: [{new_item['title']}]({new_item['link_evoluz']})"
  124. print(formatted_item)
  125. updated_block_content = f"{block_content}\n{formatted_item}"
  126. # Reconstruct the content with the updated block
  127. updated_content = f"{start_marker}\n{updated_block_content}\n{end_marker}"
  128. content = content.replace(block_match.group(0), updated_content)
  129. print(f"New item appended to '{identifier}' block.")
  130. return content
  131. def find_username_id(username):
  132. """
  133. Finds the ID for a given username from the username_id file.
  134. Args:
  135. username (str): The username to look up.
  136. file_path (str): Path to the username_id file.
  137. Returns:
  138. str: The ID of the username, or None if not found.
  139. """
  140. try:
  141. with open("username_id", "r", encoding="utf-8") as file:
  142. for line in file:
  143. if line.strip(): # Skip empty lines
  144. user, user_id = line.strip().split(":")
  145. if user == username:
  146. return user_id
  147. print(f"Username '{username}' not found.")
  148. return None
  149. except FileNotFoundError:
  150. print(f"File not found: {file_path}")
  151. return None
  152. except Exception as e:
  153. print(f"An error occurred: {e}")
  154. return None
  155. def generate_evoluz_link(username_id, task_id) :
  156. team_id = f"teamID={username_id}"
  157. update_team_id = f"updateTeamId={task_id}"
  158. full_url = f"{EVOLUZ_URL}{team_id}&{update_team_id}"
  159. return full_url
  160. def find_task_type(branch_name) :
  161. """
  162. Extracts the branch type and task ID from the branch name.
  163. Args:
  164. branch_name (str): The branch name (e.g., "feature/1234_task_name").
  165. Returns:
  166. dict: A dictionary with 'branch_type' and 'task_id', or None if invalid format.
  167. """
  168. try:
  169. # Split the branch name by "/"
  170. parts = branch_name.split("/")
  171. if len(parts) >= 2:
  172. branch_type = parts[0] # The first part is the branch type
  173. task = parts[1]
  174. task_id = task.split("_")[0] # The second part is the task ID
  175. return {"task_type": branch_type, "task_id": task_id}
  176. else:
  177. print(f"Invalid branch name format: {branch_name}")
  178. return None
  179. except Exception as e:
  180. print(f"Error extracting task type: {e}")
  181. return None
  182. @app.route('/webhook', methods=['POST'])
  183. def handle_webhook():
  184. if request.method != 'POST':
  185. return 'Invalid request method', 405
  186. try:
  187. payload = request.get_json()
  188. if not payload:
  189. return 'Invalid JSON payload', 400
  190. # Handle Pull Request Event
  191. if 'pull_request' in payload:
  192. pr = payload.get('pull_request', {})
  193. pr_title = pr.get('title')
  194. pr_username = pr.get('user').get('username')
  195. pr_username_id = find_username_id(pr_username)
  196. pr_branch = pr.get('head_branch')
  197. pr_branch_data = find_task_type(pr_branch)
  198. pr_data = {
  199. 'title': pr_title,
  200. 'username': pr_username.capitalize(),
  201. 'branch': pr_branch,
  202. 'task_type': pr_branch_data.get("task_type"),
  203. 'user_id': find_username_id(pr_username),
  204. 'link_evoluz': generate_evoluz_link(pr_username_id, pr_branch_data.get("task_id"))
  205. }
  206. print(f"Received PR: {pr_title}")
  207. # Update milestone with PR title
  208. if update_milestone(pr_data):
  209. print("Milestone updated with PR title.")
  210. else:
  211. print("Failed to update milestone.")
  212. return jsonify({'status': 'success'}), 200
  213. except Exception as e:
  214. print(f"Error: {e}")
  215. return 'Internal server error', 500
  216. if __name__ == '__main__':
  217. if login(): # Attempt login at startup
  218. print("Initial login successful.")
  219. milestone = get_milestone()
  220. print(milestone)
  221. else:
  222. print("Initial login failed.")
  223. app.run(port=4001)