gogmilestone2.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import requests
  2. from requests.auth import HTTPBasicAuth
  3. from flask import Flask, request, jsonify
  4. app = Flask(__name__)
  5. # Config
  6. LOGIN_URL = "http://localhost:7777/user/login"
  7. EDIT_MILESTONE_URL = "http://localhost:7777/setyotontowi/experiments/milestones/1/edit"
  8. USERNAME = "setyotontowi"
  9. PASSWORD = "nasipadang"
  10. session = requests.Session() # Keep session across requests
  11. def login():
  12. """Log in and store session cookies."""
  13. print("Logging in...")
  14. payload = {
  15. "user_name": USERNAME,
  16. "password": PASSWORD
  17. }
  18. response = session.post(LOGIN_URL, data=payload)
  19. if response.status_code == 200:
  20. print("Login successful.")
  21. return True
  22. else:
  23. print(f"Login failed: {response.status_code} - {response.text}")
  24. return False
  25. def get_csrf_token():
  26. """Fetch CSRF token from the edit milestone page."""
  27. print("Fetching CSRF token...")
  28. response = session.get(EDIT_MILESTONE_URL)
  29. if response.status_code == 200:
  30. token = extract_csrf_token(response.text)
  31. if token:
  32. print(f"CSRF token obtained: {token}")
  33. return token
  34. else:
  35. print("Failed to extract CSRF token.")
  36. return None
  37. elif response.status_code == 401: # Unauthorized → Need to re-login
  38. print("Session expired, re-logging in...")
  39. if login():
  40. return get_csrf_token()
  41. else:
  42. print("Failed to log in while fetching CSRF token.")
  43. return None
  44. else:
  45. print(f"Failed to get CSRF token: {response.status_code} - {response.text}")
  46. return None
  47. def extract_csrf_token(html):
  48. """Extract CSRF token from HTML using regex."""
  49. import re
  50. match = re.search(r'name="_csrf" value="(.*?)"', html)
  51. if match:
  52. return match.group(1)
  53. return None
  54. def update_milestone(pr_data):
  55. login()
  56. """Update milestone with PR title using CSRF token."""
  57. csrf_token = get_csrf_token()
  58. if not csrf_token:
  59. print("Failed to get CSRF token.")
  60. return False
  61. content = create_milestone_content(pr_data)
  62. with open("content.md", "w") as file:
  63. file.write(content)
  64. payload = {
  65. "_csrf": csrf_token,
  66. "content": content,
  67. "deadline": "2025-03-31"
  68. }
  69. headers = {
  70. "Content-Type": "application/x-www-form-urlencoded"
  71. }
  72. response = session.post(EDIT_MILESTONE_URL, data=payload, headers=headers)
  73. print("Response:", payload)
  74. if response.status_code == 200:
  75. print("Milestone updated successfully!")
  76. return True
  77. elif response.status_code == 401: # Unauthorized → Need to re-login
  78. print("Session expired, re-logging in...")
  79. if login():
  80. return update_milestone(pr_data)
  81. else:
  82. print(f"Failed to update milestone: {response.status_code} - {response.text}")
  83. return False
  84. def create_milestone_content(pr_data):
  85. content = get_milestone()
  86. if content:
  87. content['content'] = append_item_to_identifier(content['content'], 'feature_sigma', pr_data)
  88. return f"{content['content']}\n- [x] {pr_data['title']}"
  89. else:
  90. return f"- [] {pr_data['title']}"
  91. def get_milestone():
  92. import re
  93. response = session.get(EDIT_MILESTONE_URL)
  94. if response.status_code == 200:
  95. title_match = re.search(r'<input[^>]*name=["\']title["\'][^>]*value=["\'](.*?)["\']', response.text, re.IGNORECASE)
  96. content_match = re.search(r'<textarea[^>]*name=["\']content["\'][^>]*>(.*?)</textarea>', response.text, re.IGNORECASE | re.DOTALL)
  97. title = title_match.group(1) if title_match else None
  98. content = content_match.group(1).strip() if content_match else None
  99. return {'title': title, 'content': content}
  100. else:
  101. return None
  102. def append_item_to_identifier(encoded_content, identifier, new_item):
  103. # Extract the block delimited by the identifier markers
  104. import re
  105. import html
  106. content = html.unescape(encoded_content)
  107. block_pattern = rf'(<!--\s*{identifier}\s*-->)(.*?)(<!--\s*/{identifier}\s*-->)'
  108. block_match = re.search(block_pattern, content, re.DOTALL)
  109. if not block_match:
  110. print(f"Block with identifier '{identifier}' not found.")
  111. return False
  112. # Extract the parts of the block
  113. start_marker = block_match.group(1)
  114. block_content = block_match.group(2).strip()
  115. end_marker = block_match.group(3)
  116. # Append the new item to the block content
  117. formatted_item = f"- [x] {new_item['username']}: [{new_item['title']}]({new_item['branch']})"
  118. updated_block_content = f"{block_content}\n{formatted_item}"
  119. # Reconstruct the content with the updated block
  120. updated_content = f"{start_marker}\n{updated_block_content}\n{end_marker}"
  121. content = content.replace(block_match.group(0), updated_content)
  122. print(f"New item appended to '{identifier}' block.")
  123. return content
  124. @app.route('/webhook', methods=['POST'])
  125. def handle_webhook():
  126. if request.method != 'POST':
  127. return 'Invalid request method', 405
  128. try:
  129. payload = request.get_json()
  130. if not payload:
  131. return 'Invalid JSON payload', 400
  132. # Handle Pull Request Event
  133. if 'pull_request' in payload:
  134. pr = payload.get('pull_request', {})
  135. pr_title = pr.get('title')
  136. pr_username = pr.get('user').get('username')
  137. pr_branch = pr.get('head_branch')
  138. pr_data = {
  139. 'title': pr_title,
  140. 'username': pr_username,
  141. 'branch': pr_branch
  142. }
  143. print(f"Received PR: {pr_title}")
  144. # Update milestone with PR title
  145. if update_milestone(pr_data):
  146. print("Milestone updated with PR title.")
  147. else:
  148. print("Failed to update milestone.")
  149. return jsonify({'status': 'success'}), 200
  150. except Exception as e:
  151. print(f"Error: {e}")
  152. return 'Internal server error', 500
  153. if __name__ == '__main__':
  154. if login(): # Attempt login at startup
  155. print("Initial login successful.")
  156. milestone = get_milestone()
  157. print(milestone)
  158. else:
  159. print("Initial login failed.")
  160. app.run(port=4001)