Browse Source

separate function

Tontowi Prasetyo 5 months ago
parent
commit
d27a37e4de
7 changed files with 128 additions and 222 deletions
  1. BIN
      __pycache__/auth.cpython-310.pyc
  2. BIN
      __pycache__/milestone.cpython-310.pyc
  3. BIN
      __pycache__/utils.cpython-310.pyc
  4. 47 0
      auth.py
  5. 9 222
      gogmilestone2.py
  6. 34 0
      milestone.py
  7. 38 0
      utils.py

BIN
__pycache__/auth.cpython-310.pyc


BIN
__pycache__/milestone.cpython-310.pyc


BIN
__pycache__/utils.cpython-310.pyc


+ 47 - 0
auth.py

@@ -0,0 +1,47 @@
1
+import requests
2
+
3
+LOGIN_URL = "http://localhost:7777/user/login"
4
+EDIT_MILESTONE_URL = "http://localhost:7777/setyotontowi/experiments/milestones/1/edit"
5
+session = requests.Session()
6
+
7
+def login(username, password):
8
+    """Log in and store session cookies."""
9
+    print("Logging in...")
10
+    payload = {
11
+        "user_name": username,
12
+        "password": password
13
+    }
14
+    response = session.post(LOGIN_URL, data=payload)
15
+    if response.status_code == 200:
16
+        print("Login successful.")
17
+        return True
18
+    else:
19
+        print(f"Login failed: {response.status_code} - {response.text}")
20
+        return False
21
+
22
+def get_csrf_token():
23
+    """Fetch CSRF token from the edit milestone page."""
24
+    print("Fetching CSRF token...")
25
+    response = session.get(EDIT_MILESTONE_URL)
26
+    if response.status_code == 200:
27
+        token = extract_csrf_token(response.text)
28
+        if token:
29
+            print(f"CSRF token obtained: {token}")
30
+            return token
31
+        else:
32
+            print("Failed to extract CSRF token.")
33
+            return None
34
+    elif response.status_code == 401:  # Unauthorized → Need to re-login
35
+        print("Session expired, re-logging in...")
36
+        return None
37
+    else:
38
+        print(f"Failed to get CSRF token: {response.status_code} - {response.text}")
39
+        return None
40
+
41
+def extract_csrf_token(html):
42
+    """Extract CSRF token from HTML using regex."""
43
+    import re
44
+    match = re.search(r'name="_csrf" value="(.*?)"', html)
45
+    if match:
46
+        return match.group(1)
47
+    return None

+ 9 - 222
gogmilestone2.py

@@ -1,263 +1,50 @@
1
-import requests
2
-from requests.auth import HTTPBasicAuth
3
 from flask import Flask, request, jsonify
1
 from flask import Flask, request, jsonify
2
+from auth import login, get_csrf_token
3
+from milestone import get_milestone, append_item_to_identifier
4
+from utils import find_username_id, generate_evoluz_link, find_task_type
4
 
5
 
5
 app = Flask(__name__)
6
 app = Flask(__name__)
6
 
7
 
7
-# Config
8
-LOGIN_URL = "http://localhost:7777/user/login"
9
-EDIT_MILESTONE_URL = "http://localhost:7777/setyotontowi/experiments/milestones/1/edit"
10
-EVOLUZ_URL = "http://192.168.1.33:5050/apps/team/detail?"
11
 USERNAME = "setyotontowi"
8
 USERNAME = "setyotontowi"
12
 PASSWORD = "nasipadang"
9
 PASSWORD = "nasipadang"
13
-
14
-session = requests.Session()  # Keep session across requests
15
-
16
-def login():
17
-    """Log in and store session cookies."""
18
-    print("Logging in...")
19
-    payload = {
20
-        "user_name": USERNAME,
21
-        "password": PASSWORD
22
-    }
23
-    response = session.post(LOGIN_URL, data=payload)
24
-    
25
-    if response.status_code == 200:
26
-        print("Login successful.")
27
-        return True
28
-    else:
29
-        print(f"Login failed: {response.status_code} - {response.text}")
30
-        return False
31
-
32
-def get_csrf_token():
33
-    """Fetch CSRF token from the edit milestone page."""
34
-    print("Fetching CSRF token...")
35
-
36
-    response = session.get(EDIT_MILESTONE_URL)
37
-    if response.status_code == 200:
38
-        token = extract_csrf_token(response.text)
39
-        if token:
40
-            print(f"CSRF token obtained: {token}")
41
-            return token
42
-        else:
43
-            print("Failed to extract CSRF token.")
44
-            return None
45
-    elif response.status_code == 401:  # Unauthorized → Need to re-login
46
-        print("Session expired, re-logging in...")
47
-        if login():
48
-            return get_csrf_token()
49
-        else:
50
-            print("Failed to log in while fetching CSRF token.")
51
-            return None
52
-    else:
53
-        print(f"Failed to get CSRF token: {response.status_code} - {response.text}")
54
-        return None
55
-
56
-def extract_csrf_token(html):
57
-    """Extract CSRF token from HTML using regex."""
58
-    import re
59
-    match = re.search(r'name="_csrf" value="(.*?)"', html)
60
-    if match:
61
-        return match.group(1)
62
-    return None
63
-
64
-def update_milestone(pr_data):
65
-    login()
66
-    """Update milestone with PR title using CSRF token."""
67
-    csrf_token = get_csrf_token()
68
-    if not csrf_token:
69
-        print("Failed to get CSRF token.")
70
-        return False
71
-
72
-    content = create_milestone_content(pr_data)
73
-
74
-    with open("content.md", "w") as file:
75
-        file.write(content)
76
-
77
-    payload = {
78
-        "_csrf": csrf_token,
79
-        "title":"Should change perhaps",
80
-        "content": content,
81
-        "deadline": "2025-03-31"
82
-    }
83
-    headers = {
84
-        "Content-Type": "application/x-www-form-urlencoded"
85
-    }
86
-
87
-    response = session.post(EDIT_MILESTONE_URL, data=payload, headers=headers)
88
-
89
-    if response.status_code == 200:
90
-        print("Milestone updated successfully!")
91
-        return True
92
-    elif response.status_code == 401:  # Unauthorized → Need to re-login
93
-        print("Session expired, re-logging in...")
94
-        if login():
95
-            return update_milestone(pr_data)
96
-    else:
97
-        print(f"Failed to update milestone: {response.status_code} - {response.text}")
98
-        return False
99
-
100
-def create_milestone_content(pr_data):
101
-    content = get_milestone()
102
-    if content:
103
-        # From PR Data, get branch type, and username to assign it to identifier. like feature owi, then it should be
104
-        # in feature_lambda
105
-        type_task = pr_data.get("task_type")
106
-        squad = "sigma"
107
-        identifier = f'{type_task}_{squad}'
108
-        content['content'] = append_item_to_identifier(content['content'], identifier, pr_data)
109
-        return f"{content['content']}\n"
110
-    else:
111
-        return f"- [] {pr_data['title']}"
112
-
113
-def get_milestone():
114
-    import re
115
-    response = session.get(EDIT_MILESTONE_URL)
116
-    if response.status_code == 200:
117
-        title_match = re.search(r'<input[^>]*name=["\']title["\'][^>]*value=["\'](.*?)["\']', response.text, re.IGNORECASE)
118
-        content_match = re.search(r'<textarea[^>]*name=["\']content["\'][^>]*>(.*?)</textarea>', response.text, re.IGNORECASE | re.DOTALL)
119
-        title = title_match.group(1) if title_match else None
120
-        content = content_match.group(1).strip() if content_match else None
121
-        return {'title': title, 'content': content}
122
-    else:
123
-        
124
-        return None
125
-
126
-def append_item_to_identifier(encoded_content, identifier, new_item):
127
-    # Extract the block delimited by the identifier markers
128
-    import re
129
-    import html
130
-    content = html.unescape(encoded_content)
131
-    block_pattern = rf'(<!--\s*{identifier}\s*-->)(.*?)(<!--\s*/{identifier}\s*-->)'
132
-    block_match = re.search(block_pattern, content, re.DOTALL)
133
-    if not block_match:
134
-        print(f"Block with identifier '{identifier}' not found.")
135
-        return False
136
-
137
-    # Extract the parts of the block
138
-    start_marker = block_match.group(1)
139
-    block_content = block_match.group(2).strip()
140
-    end_marker = block_match.group(3)
141
-
142
-    # Append the new item to the block content
143
-    formatted_item = f"- [x] {new_item['username']}: [{new_item['title']}]({new_item['link_evoluz']})"
144
-    print(formatted_item)
145
-    updated_block_content = f"{block_content}\n{formatted_item}"
146
-
147
-    # Reconstruct the content with the updated block
148
-    updated_content = f"{start_marker}\n{updated_block_content}\n{end_marker}"
149
-    content = content.replace(block_match.group(0), updated_content)
150
-
151
-    print(f"New item appended to '{identifier}' block.")
152
-    return content
153
-
154
-def find_username_id(username):
155
-    """
156
-    Finds the ID for a given username from the username_id file.
157
-
158
-    Args:
159
-        username (str): The username to look up.
160
-        file_path (str): Path to the username_id file.
161
-
162
-    Returns:
163
-        str: The ID of the username, or None if not found.
164
-    """
165
-    try:
166
-        with open("username_id", "r", encoding="utf-8") as file:
167
-            for line in file:
168
-                if line.strip():  # Skip empty lines
169
-                    user, user_id = line.strip().split(":")
170
-                    if user == username:
171
-                        return user_id
172
-        print(f"Username '{username}' not found.")
173
-        return None
174
-    except FileNotFoundError:
175
-        print(f"File not found: {file_path}")
176
-        return None
177
-    except Exception as e:
178
-        print(f"An error occurred: {e}")
179
-        return None
180
-
181
-def generate_evoluz_link(username_id, task_id) :
182
-    team_id = f"teamID={username_id}"
183
-    update_team_id = f"updateTeamId={task_id}" 
184
-
185
-    full_url = f"{EVOLUZ_URL}{team_id}&{update_team_id}"
186
-    return full_url
187
-
188
-def find_task_type(branch_name) : 
189
-    """
190
-    Extracts the branch type and task ID from the branch name.
191
-
192
-    Args:
193
-        branch_name (str): The branch name (e.g., "feature/1234_task_name").
194
-
195
-    Returns:
196
-        dict: A dictionary with 'branch_type' and 'task_id', or None if invalid format.
197
-    """
198
-    try:
199
-        # Split the branch name by "/"
200
-        parts = branch_name.split("/")
201
-        if len(parts) >= 2:
202
-            branch_type = parts[0]  # The first part is the branch type
203
-            task = parts[1]
204
-            task_id = task.split("_")[0]  # The second part is the task ID
205
-            return {"task_type": branch_type, "task_id": task_id}
206
-        else:
207
-            print(f"Invalid branch name format: {branch_name}")
208
-            return None
209
-    except Exception as e:
210
-        print(f"Error extracting task type: {e}")
211
-        return None
10
+EVOLUZ_URL = "http://192.168.1.33:5050/apps/team/detail?"
212
 
11
 
213
 @app.route('/webhook', methods=['POST'])
12
 @app.route('/webhook', methods=['POST'])
214
 def handle_webhook():
13
 def handle_webhook():
215
-    if request.method != 'POST':
216
-        return 'Invalid request method', 405
217
-    
218
     try:
14
     try:
219
         payload = request.get_json()
15
         payload = request.get_json()
220
         if not payload:
16
         if not payload:
221
             return 'Invalid JSON payload', 400
17
             return 'Invalid JSON payload', 400
222
 
18
 
223
-        # Handle Pull Request Event
224
         if 'pull_request' in payload:
19
         if 'pull_request' in payload:
225
             pr = payload.get('pull_request', {})
20
             pr = payload.get('pull_request', {})
226
-
227
             pr_title = pr.get('title')
21
             pr_title = pr.get('title')
228
             pr_username = pr.get('user').get('username')
22
             pr_username = pr.get('user').get('username')
229
-            pr_username_id = find_username_id(pr_username)
230
             pr_branch = pr.get('head_branch')
23
             pr_branch = pr.get('head_branch')
231
             pr_branch_data = find_task_type(pr_branch)
24
             pr_branch_data = find_task_type(pr_branch)
232
-            
25
+
233
             pr_data = {
26
             pr_data = {
234
                 'title': pr_title,
27
                 'title': pr_title,
235
                 'username': pr_username.capitalize(),
28
                 'username': pr_username.capitalize(),
236
                 'branch': pr_branch,
29
                 'branch': pr_branch,
237
                 'task_type': pr_branch_data.get("task_type"),
30
                 'task_type': pr_branch_data.get("task_type"),
238
                 'user_id': find_username_id(pr_username),
31
                 'user_id': find_username_id(pr_username),
239
-                'link_evoluz': generate_evoluz_link(pr_username_id, pr_branch_data.get("task_id"))
32
+                'link_evoluz': generate_evoluz_link(find_username_id(pr_username), pr_branch_data.get("task_id"), EVOLUZ_URL)
240
             }
33
             }
241
 
34
 
242
-            print(f"Received PR: {pr_title}")
243
-
244
-            # Update milestone with PR title
245
             if update_milestone(pr_data):
35
             if update_milestone(pr_data):
246
-                print("Milestone updated with PR title.")
36
+                print("Milestone updated successfully.")
247
             else:
37
             else:
248
                 print("Failed to update milestone.")
38
                 print("Failed to update milestone.")
249
 
39
 
250
         return jsonify({'status': 'success'}), 200
40
         return jsonify({'status': 'success'}), 200
251
-
252
     except Exception as e:
41
     except Exception as e:
253
         print(f"Error: {e}")
42
         print(f"Error: {e}")
254
         return 'Internal server error', 500
43
         return 'Internal server error', 500
255
 
44
 
256
 if __name__ == '__main__':
45
 if __name__ == '__main__':
257
-    if login():  # Attempt login at startup
46
+    if login(USERNAME, PASSWORD):
258
         print("Initial login successful.")
47
         print("Initial login successful.")
259
-        milestone = get_milestone()
260
-        print(milestone)
261
     else:
48
     else:
262
         print("Initial login failed.")
49
         print("Initial login failed.")
263
-    app.run(port=4001)
50
+    app.run(port=4001)

+ 34 - 0
milestone.py

@@ -0,0 +1,34 @@
1
+import re
2
+import html
3
+
4
+def get_milestone(session, edit_milestone_url):
5
+    """Fetch milestone details."""
6
+    response = session.get(edit_milestone_url)
7
+    if response.status_code == 200:
8
+        title_match = re.search(r'<input[^>]*name=["\']title["\'][^>]*value=["\'](.*?)["\']', response.text, re.IGNORECASE)
9
+        content_match = re.search(r'<textarea[^>]*name=["\']content["\'][^>]*>(.*?)</textarea>', response.text, re.IGNORECASE | re.DOTALL)
10
+        title = title_match.group(1) if title_match else None
11
+        content = content_match.group(1).strip() if content_match else None
12
+        return {'title': title, 'content': content}
13
+    else:
14
+        print(f"Failed to fetch milestone: {response.status_code}")
15
+        return None
16
+
17
+def append_item_to_identifier(encoded_content, identifier, new_item):
18
+    """Append a new item to the block identified by the given identifier."""
19
+    content = html.unescape(encoded_content)
20
+    block_pattern = rf'(<!--\s*{identifier}\s*-->)(.*?)(<!--\s*/{identifier}\s*-->)'
21
+    block_match = re.search(block_pattern, content, re.DOTALL)
22
+    if not block_match:
23
+        print(f"Block with identifier '{identifier}' not found.")
24
+        return False
25
+
26
+    start_marker = block_match.group(1)
27
+    block_content = block_match.group(2).strip()
28
+    end_marker = block_match.group(3)
29
+
30
+    formatted_item = f"- [x] {new_item['username']}: [{new_item['title']}]({new_item['link_evoluz']})"
31
+    updated_block_content = f"{block_content}\n{formatted_item}"
32
+
33
+    updated_content = f"{start_marker}\n{updated_block_content}\n{end_marker}"
34
+    return updated_content

+ 38 - 0
utils.py

@@ -0,0 +1,38 @@
1
+def find_username_id(username, file_path="username_id"):
2
+    """Find the ID for a given username from the username_id file."""
3
+    try:
4
+        with open(file_path, "r", encoding="utf-8") as file:
5
+            for line in file:
6
+                if line.strip():
7
+                    user, user_id = line.strip().split(":")
8
+                    if user == username:
9
+                        return user_id
10
+        print(f"Username '{username}' not found.")
11
+        return None
12
+    except FileNotFoundError:
13
+        print(f"File not found: {file_path}")
14
+        return None
15
+    except Exception as e:
16
+        print(f"An error occurred: {e}")
17
+        return None
18
+
19
+def generate_evoluz_link(username_id, task_id, base_url):
20
+    """Generate Evoluz link."""
21
+    team_id = f"teamID={username_id}"
22
+    update_team_id = f"updateTeamId={task_id}"
23
+    return f"{base_url}{team_id}&{update_team_id}"
24
+
25
+def find_task_type(branch_name):
26
+    """Extract branch type and task ID from branch name."""
27
+    try:
28
+        parts = branch_name.split("/")
29
+        if len(parts) >= 2:
30
+            branch_type = parts[0]
31
+            task_id = parts[1].split("_")[0]
32
+            return {"task_type": branch_type, "task_id": task_id}
33
+        else:
34
+            print(f"Invalid branch name format: {branch_name}")
35
+            return None
36
+    except Exception as e:
37
+        print(f"Error extracting task type: {e}")
38
+        return None