Compare commits

..

No commits in common. "v1.0" and "v2.0" have entirely different histories.
v1.0 ... v2.0

22 changed files with 452 additions and 1228 deletions

3
.gitignore vendored
View file

@ -1,2 +1,3 @@
__pycache__ __pycache__
*.session *.session
result

View file

@ -1 +0,0 @@
worker: python3 -m bot

25
README.md Normal file
View file

@ -0,0 +1,25 @@
<img src="https://cdn-icons-png.flaticon.com/512/207/207190.png" align="right" alt="Todo Icon" width="150" height="150">
# IGNOU Telegram Bot ✔️
*A Telegram bot built in Python to help IGNOU students check their results and receive notifications as soon as they are released.*
[![GitHub issues](https://img.shields.io/github/issues/thedevone/IGNOU-Telegram-Bot?color=red&style=for-the-badge)](https://github.com/thedevone/IGNOU-Telegram-Bot/issues)
[![GitHub stars](https://img.shields.io/github/stars/thedevone/IGNOU-Telegram-Bot?color=green&style=for-the-badge)](https://github.com/thedevone/IGNOU-Telegram-Bot/stargazers)
# Features 🌟
> Some features that bot comes with:
- A beautiful result UI
- Provides fast and accurate results through web scraping mechanism
- Designed to be easy to use for students
# Screenshots 🖼️
![Screenshot](https://user-images.githubusercontent.com/85359493/228441589-93ee5a19-9681-4380-9c8d-1476a8171ba2.png)
![image](https://user-images.githubusercontent.com/85359493/228441765-1be35ad9-810f-466a-aebb-9d6f7b34783b.png)
![image](https://user-images.githubusercontent.com/85359493/228442303-27c048b6-4b27-4f00-ad92-f5e5b30900b9.png)
# Contribution 🤝
- Want to contribute? Feel free to open a PR! 😸

View file

@ -1,18 +1,65 @@
from .ignou import Ignou import os
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from bot.database import Database from pyrogram import Client, filters
from pyrogram.enums import ChatType, ParseMode
from .ignou import IgnouResult, ResultNotFoundError
try:
BOT_TOKEN = os.environ["BOT_TOKEN"]
except KeyError:
print("BOT_TOKEN not found in environment variables.")
exit(1)
app = Client(
":ignou:",
api_id=os.environ.get("API_ID", "21724"),
api_hash=os.environ.get("API_HASH", "3e0cb5efcd52300aec5994fdfc5bdc16"),
bot_token=BOT_TOKEN,
in_memory=True
)
ignou = IgnouResult()
@app.on_message(filters.command("start"))
async def start(_, message):
await message.reply_text(
f"Hi {message.from_user.mention}!\nSend me your program code with enrollment number to get your grade card result.",
)
@app.on_message(filters.regex(r"([a-zA-z]+)\s*?(\d+$)"))
async def get_grade_result(_, message):
if message.chat.type == ChatType.GROUP or message.chat.type == ChatType.SUPERGROUP:
await message.delete()
program_id, enrollment_no = message.matches[0].groups()
try:
footer = f"<strong>Result checked by {message.from_user.mention(message.from_user.first_name)}</strong>"
except AttributeError:
footer = ""
try:
result = await ignou.gradeResultData(program_id, enrollment_no)
except ResultNotFoundError:
await message.reply_text("Grade Card Result not found.")
return
if message.chat.type == ChatType.GROUP or message.chat.type == ChatType.SUPERGROUP:
await message.reply_text(
f"<pre>{result['table']}</pre>\n{footer}",
parse_mode=ParseMode.HTML,
)
elif message.chat.type == ChatType.PRIVATE:
await message.reply_text(
f"<pre>{result['header']}{result['table']}</pre>",
parse_mode=ParseMode.HTML,
)
db = Database()
client = Ignou()
if __name__ == "__main__": if __name__ == "__main__":
app.run()
# schedular for checking result
scheduler = AsyncIOScheduler()
scheduler.add_job(client.tee_crawler, 'cron', day_of_week='0-6', hour="0-19", minute='0-59/20')
scheduler.add_job(client.grade_crawler, 'cron', day_of_week='0-6', hour="0-19", minute='0-59/25')
scheduler.start()
client.run()

View file

@ -1,15 +0,0 @@
import os
class Config:
NAME = "IGNOU"
USERNAME = os.environ.get("USERNAME")
BOT_TOKEN = os.environ.get("BOT_TOKEN")
API_ID = os.environ.get("API_ID")
API_HASH = os.environ.get("API_HASH")
SUDO_CHAT = os.environ.get("SUDO_CHAT")
SUDO_ADMIN = os.environ.get("SUDO_ADMIN","").split(",")
DATABASE_URL = os.environ.get("DB_URL","localhost:27017")
SESSION_NAME = os.environ.get("SESSION_NAME")
FOOTER_CREDIT = f"\n<b>Result fetched using {USERNAME}</b>"
HELP_URL = os.environ.get("HELP_URL")

View file

@ -1 +0,0 @@
from .database import Database

View file

@ -1,114 +0,0 @@
import datetime
import motor.motor_asyncio
from bot.config import Config
class Singleton(type):
__instances__ = {}
def __call__(cls, *args, **kwargs):
if cls not in cls.__instances__:
cls.__instances__[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls.__instances__[cls]
class Database(metaclass=Singleton):
def __init__(self):
self._client = motor.motor_asyncio.AsyncIOMotorClient(Config.DATABASE_URL)
self.db = self._client[Config.SESSION_NAME]
self.user = self.db.user
self.crawler = self.db.crawler
self.site = self.db.site
async def get_student(self, _id):
return await self.crawler.find_one({"_id" : _id})
async def get_user(self, id):
user = await self.user.find_one({"_id": id})
return user
def get_time(self):
return datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
async def add_user(self, _id, name):
user = dict(
_id=_id,
name = name,
join_date=datetime.date.today().isoformat(),
action = dict(
last_used_on=self.get_time(),
),
role_status=dict(
is_admin = False
)
)
await self.user.insert_one(user)
async def is_user_exist(self, id):
user = await self.get_user(id)
return True if user else False
async def total_users_count(self):
count = await self.user.count_documents({})
return count
async def total_crawlers_count(self):
count = await self.crawler.count_documents({})
return count
async def get_all_users(self):
all_users = self.user.find({})
return all_users
async def get_all_crawlers(self):
all_users = self.crawler.find({})
return all_users
async def get_last_used_on(self, _id):
user = await self.get_user(_id)
return user.get("last_used_on", datetime.date.today().isoformat())
# Last Used Time
async def update_last_used_on(self, _id):
await self.user.update_one(
{"_id": _id}, {"$set": {"action.last_used_on": self.get_time()}}
)
# Get When Last Time Used
async def get_last_used_on(self, _id):
user = await self.get_user(_id)
return user.last_used_on
# Last Action
async def update_last_action(self,_id, query):
query['last_used_on'] = self.get_time()
self.user.update_one(
{"_id": _id},
{"$set": {"action": query}})
async def update(self,col, _id, info_dict):
await col.update_one(
{"_id" : _id},
info_dict)
async def find(self, col,_id,info_dict = {}):
return await col.find_one(
{"_id" : _id },
info_dict)
async def insert(self, col, info_dict):
return await col.insert_one(info_dict)
async def get_site_update(self,site):
site_update = await self.site.find_one({"_id": site})
if site_update is None:
await self.site.insert_one({"_id": site})
return {}
return site_update

View file

View file

@ -1,55 +0,0 @@
class User:
def __init__(self, user):
if user is None:
user = {}
self._id = user.get("_id")
self.name = user.get("name")
self.course = user.get("action", {}).get("course")
self.myenrollment = user.get("myenrollment")
self.following = user.get("following", {})
self.enrollment = user.get("action", {}).get("enrollment")
self.last_used_on = user.get("action", {}).get("last_used_on")
self.is_admin = user.get("role_status", {}).get("is_admin", False)
self.user = user
def dict(self):
return self.user
#
class Student:
def __init__(self, student):
if student is None:
student = {}
self._id = student.get("_id")
self.name = student.get("name")
self.course = student.get("course")
self.myenrollment = student.get("myenrollment")
self.followers = student.get("followers", {})
self.enrollment = student.get("action", {}).get("enrollment")
self.last_used_on = student.get("action", {}).get("last_used_on")
self.is_admin = student.get("role_status", {}).get("is_admin", False)
self.grade = self.Grade(student.get("grade"))
self.tee = self.Tee(student.get("tee"))
self.student = student
def dict(self):
return self.student
# grade card
class Grade:
def __init__(self, grade):
self.passed = grade.get("count",{}).get("passed", 0)
self.failed = grade.get("count",{}).get("failed", 0)
self.checked = grade.get("checked")
# tee card
class Tee:
def __init__(self, tee):
self.count = tee.get("count",0)
self.checked = tee.get("checked")

View file

@ -1,77 +0,0 @@
import requests
import os
class IgnouBooks:
def __init__(self,course=None,subject='None') -> None:
self.headers = {'User-Agent':'Dalvik/2.1.0 (Linux; U; Android 10; Redmi Note 7 Pro Build/QQ3A.200605.001)','b8S3lCfLoGo4DVAzNQnl5OpMyALyq8e2WpWbZTMlwZ5iPHr.UaVNW':'J0lta3zy@19' }
self.course = course.upper() # Course Name like BCA or MCA
self.subject = subject.upper() # If subject code passed like BCS011 or MCS023
self.pId = '' # IGNOU course id like BCA 23 and MCA 25
self.cId = '' # IGNOU Course subject id list
self.courseList = '' # get all subject of course
def courseCode(self):
courses = ['https://egkapp.ignouonline.ac.in/api/programmes/type/Bachelor','https://egkapp.ignouonline.ac.in/api/programmes/type/Master']
for courseurl in courses:
response = requests.get(courseurl,headers = self.headers).json()
for res in response:
if '(' not in res['pCode']:
if self.course == res['pCode'].upper():
self.pId = res['pId']
return
elif '(' in res['pCode'] and 'English' == res['pMedium']:
if self.course == res['pCode'].split("(")[0].upper():
self.pId = res['pId']
return
def subjectCode(self):
self.courseCode()
subjects = 'https://egkapp.ignouonline.ac.in/api/courses/p/' + str(self.pId)
response = requests.get(subjects,headers = self.headers).json()
substring ='' # 〽️
for res in response:
fullname = res['cCode']
con = any(map(str.isdigit, fullname))
if not con:
con = False
continue
if '-' in fullname:
first = fullname.split('-')[0]
last = str(int(fullname.split('-')[1])//1)
finalname = first + last
if finalname == self.subject:
self.cId = res['cId']
title = res['cTitle']
coursename = res['cpId']['pCode']
substring += f'{title} \n〽️ `/book {coursename} {finalname}`\n\n'
self.courseList = substring
def getCourseSubjectlist(self):
self.courseCode()
self.subjectCode()
return self.courseList
def getDownload(self):
self.courseCode()
self.subjectCode()
downloads = 'https://egkapp.ignouonline.ac.in/api/blocks/c/' + str(self.cId)
response = requests.get(downloads,headers = self.headers).json()
namelist = []
for res in response:
downurl = 'https://egkapp.ignouonline.ac.in/api/blocks/download/' + str(res['bId'])
downloadresponse = requests.get(downurl,headers = self.headers)
open(f"{res['bCode']} [{self.subject}][@IGNOUpyBoT].pdf", 'wb').write(downloadresponse.content)
namelist.append(f"{res['bCode']} [{self.subject}][@IGNOUpyBoT].pdf")
print(f"{res['bCode']} [{self.subject}][@IGNOUpyBoT].pdf")
return namelist

View file

@ -1,233 +0,0 @@
import asyncio
from bot.helper.ignouresult import IgnouResult
from bot.database import Database
from bot.helper.extractor import Student
import datetime
import time
from pyrogram import Client
from pyrogram.errors import FloodWait,PeerIdInvalid
db = Database()
class IgnouCrawler:
def __init__(self,client) -> None:
self.db = db
self.client = client
self.greeted = {
"grade" : {},
"tee" : {}
}
self.greet_msg = {
"grade" : "Grade Card Updated Today ",
"tee" : "One more result out Today 🤒"
}
self.todayDate = datetime.datetime.today().strftime('%B %d, %Y')
async def greet_user(self, result_type, user_id):
if not self.greeted.get(result_type).get(user_id):
self.greeted[result_type][user_id] = True
try:
await self.client.send_message(
user_id,
f"<b>{self.greet_msg.get(result_type)}👩🏻‍🎨</b>",parse_mode='html')
except FloodWait as e:
time.sleep(e.x)
await self.client.send_message(
user_id,
f"<b>{self.greet_msg.get(result_type)}👩🏻‍🎨</b>",parse_mode='html')
except PeerIdInvalid as e:
print(f"{user_id} -> {e}")
async def teeCrawl(self, student: Student):
data = IgnouResult('roshan'+ student._id).teeResultString()
if data and student.tee.count != data.get("count"):
title = '<pre>' + f'Name : {student.name} -> {student.course}\n' + '</pre>'
for user_id in student.followers:
if not self.greeted.get("tee").get(user_id):
await self.greet_user("tee", user_id)
try:
await self.client.send_message(
chat_id=user_id,
text= title + data.get("result"),
parse_mode='html')
except FloodWait as e:
time.sleep(e.x)
await self.client.send_message(
chat_id=user_id,
text= title + data.get("result"),
parse_mode='html')
except PeerIdInvalid as e:
print(f"{user_id} -> {e}")
await self.db.update(
self.db.crawler,
student._id,
{
"$set": {
"tee.count": data.get("count"),
"tee.checked": self.todayDate
}
}
)
async def teeTask(self):
print("Tee Result Crawling : {}".format(datetime.datetime.today().strftime("%d/%m/%Y %H:%M:%S")))
await self.db.update(
self.db.site,
"ignou",
{
"$set" : {
"tee_checked": datetime.datetime.today().strftime("%d/%m/%Y %H:%M:%S")
}
}
)
tasks = []
students = await db.get_all_crawlers()
self.greeted['tee'] = {}
# Check first TEE SIte Updated or not
site = await IgnouResult().teeCardUpdated()
if not site.get("updated"):
# print("Tee card Site not Updated")
return
# if site updated check for results
async for student in students:
student_info = Student(student)
if student_info.tee.checked == self.todayDate:
continue
tasks.append(
asyncio.create_task(
self.teeCrawl(student_info)
)
)
await asyncio.gather(*tasks)
await self.db.update(
self.db.site,
"ignou",
{
"$set" : {
"tee" : site.get("date"),
"tee_checked": datetime.datetime.today().strftime("%d/%m/%Y %H:%M:%S")
}
}
)
async def gradeCrawl(self,student: Student):
data = IgnouResult(student.course + student._id).gradeResultString()
grade_passed = data.get("json", {}).get("count", {}).get("passed", 0)
grade_failed = data.get("json", {}).get("count", {}).get("failed", 0)
if data and (int(student.grade.passed) != grade_passed or int(student.grade.failed) != grade_failed) or True:
for user_id in student.followers:
if not self.greeted.get("grade").get(user_id):
await self.greet_user("grade", user_id)
try:
await self.client.send_message(
chat_id= user_id,
text = data.get("result"),
parse_mode ='html')
except FloodWait as e:
time.sleep(e.x)
await self.client.send_message(
chat_id= user_id,
text = data.get("result"),
parse_mode ='html')
except PeerIdInvalid as e:
print(f"{user_id} -> {e}")
await self.db.update(
self.db.crawler,
student._id,
{
"$set" : {
"grade.count.passed": grade_passed,
"grade.count.failed": grade_failed,
"grade.checked": self.todayDate
}
}
)
async def gradeTask(self):
print("Grade Card Crawling : {}".format(datetime.datetime.today().strftime("%d/%m/%Y %H:%M:%S")))
await self.db.update(
self.db.site,
"ignou",
{
"$set": {
"grade_checked": datetime.datetime.today().strftime("%d/%m/%Y %H:%M:%S")
}
}
)
tasks = []
students = await db.get_all_crawlers()
self.greeted['grade'] = {}
# Check first Grade Site Updated or not
site = await IgnouResult().gradeCardUpdated()
if not site.get("updated"):
# print("Grade card Site not Updated")
return
# if site updated check for results
async for student in students:
student_info = Student(student)
if student_info.grade.checked == self.todayDate:
continue
tasks.append(
asyncio.create_task(
self.gradeCrawl(student_info)
)
)
await asyncio.gather(*tasks)
await self.db.update(
self.db.site,
"ignou",
{
"$set" : {
"grade": site.get("date"),
"grade_checked": datetime.datetime.today().strftime("%d/%m/%Y %H:%M:%S")
}
}
)
if __name__ == "__main__":
pass

View file

@ -1,218 +0,0 @@
import requests
from bs4 import BeautifulSoup
from prettytable import PrettyTable
import re
from bot.database import Database
db = Database()
class IgnouResult:
def __init__(self,text='Bca 197940316') -> None:
self.text = text.upper().strip()
self.enrollmentNo = ''
self.courseId = ''
self.extractCourseIDandEnrollmentNo()
self.sem = 'Dec20'
self.session = requests.Session()
self.session.headers.update({
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
'Upgrade-Insecure-Requests': '1',
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0.1; Moto G (4)) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Mobile Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Sec-GPC': '1',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Language': 'en-US,en;q=0.9,ru;q=0.8',
}
)
def extractCourseIDandEnrollmentNo(self):
self.courseId = re.findall("\D+", self.text)[0].strip()
self.enrollmentNo = re.findall('\d+', self.text)[0].strip()
return self.courseId,self.enrollmentNo
def teeResultJson(self):
data = {
'eno': self.enrollmentNo,
'myhide': 'OK'
}
teeUrl = f'https://termendresult.ignou.ac.in/TermEnd{self.sem}/TermEnd{self.sem}.asp'
response = self.session.post(teeUrl, data=data)
if 'Not found' in response.text:
return {'status':'notok'} # result not found
soup = BeautifulSoup(response.text,'lxml')
trs = soup.find_all('tr')[1:]
resultJson = list()
for tr in trs:
info = tr.find_all("td")
data = list()
for index,col in enumerate(info):
if index == len(info) - 1:
break
data.append(info[index].text.strip())
resultJson.append(data)
return {'result' : resultJson,'count' : len(trs),'html':response.text, 'status':'ok'}
def gradeResultJson(self):
data = {
'Program': self.courseId,
'eno': self.enrollmentNo,
'submit': 'Submit',
'hidden_submit': 'OK'
}
if self.courseId in ['BCA', 'MCA', 'MP', 'MBP', 'PGDHRM', 'PGDFM', 'PGDOM', 'PGDMM', 'PGDFMP']:
url = 'https://gradecard.ignou.ac.in/gradecardM/Result.asp'
elif self.courseId in ['ASSSO', 'BA', 'BCOM', 'BDP', 'BSC']:
url = 'https://gradecard.ignou.ac.in/gradecardB/Result.asp'
else:
url = 'https://gradecard.ignou.ac.in/gradecardR/Result.asp'
response = self.session.post(url, data=data)
soup = BeautifulSoup(response.text,'lxml')
if 'Not found' in response.text or response.status_code != 200:
return {'status':'notok'} # result not found
trs = soup.find_all("tr")[1:]
resultJson = list()
student = dict()
btags = soup.find_all('b')[3:6]
for n,ba in enumerate(btags,1):
if n == 1:
student['enrollment'] = ba.text.split(":")[1].strip()
elif n == 2:
student['name'] = ba.text.split(":")[1].strip()
else:
student['course'] = ba.text.split(":")[1].strip()
count = dict()
count['passed'] = 0
count['failed'] = 0
count['total'] = 0
for tr in trs:
data = list()
td = tr.find_all("td")
data.append(td[0].string.strip())
data.append(td[1].string.strip())
data.append(td[2].string.strip())
data.append(td[6].string.strip())
status = True if 'Not' not in td[7].string.strip() else False # ✅ ❌
data.append(status)
if status:
count['passed'] += 1
else:
count['failed'] += 1
count['total'] += 1
resultJson.append(data)
return {'result' : resultJson,'student' : student,'count' : count,'html':response.text,'status':'ok'}
def gradeResultString(self):
x = PrettyTable()
x.field_names = ["Course","Asign","Lab","Term","Status"]
gradeJson = self.gradeResultJson()
if gradeJson['status'] != 'ok':
return False
header = 'Name : {}\n -> {} -> {}\n'.format(gradeJson['student']['name'],self.courseId,self.enrollmentNo)
for sub in gradeJson['result']:
tick = '' if sub[-1] else ''
sub[-1] = tick
x.add_row(sub)
footer = ['count','T:{}'.format(gradeJson['count']['total']),
'P:{}'.format(gradeJson['count']['passed']),
'F:{}'.format(gradeJson['count']['failed']),
'L:{}'.format(gradeJson['count']['total']-gradeJson['count']['passed'])]
x.add_row(footer)
return {
'enrollmentno' : self.enrollmentNo,
'course' : self.courseId,
'result' : '<pre>' + header + x.get_string() + '</pre>',
'json' : gradeJson
}
def teeResultString(self):
x = PrettyTable()
header = 'Enrollment no : {} ({})\n'.format(self.enrollmentNo,self.sem)
x.field_names = ["Course","Marks","Max","Month","Updation"]
teeJson = self.teeResultJson()
if teeJson['status'] != 'ok':
return False
for sub in teeJson['result']:
x.add_row(sub)
return {
'enrollmentno' : self.enrollmentNo,
'course' : self.courseId,
'count' : teeJson['count'],
'result' : '<pre>' + header + x.get_string() + '</pre>',
'json' : teeJson
}
async def gradeCardUpdated(self):
response = self.gradeResultJson()
if response['status'] != 'ok':
return False
updated = re.findall('([\w]+ ?[\d]+, ?[\d]+)',response['html'])[0]
last_updateed = await db.get_site_update("ignou")
if updated != last_updateed.get("grade",''):
return {"date" : updated,"updated" : True}
return {"date" : updated,"updated" : False}
async def teeCardUpdated(self):
teeUrl = f'https://termendresult.ignou.ac.in/TermEnd{self.sem}/TermEnd{self.sem}.asp'
response = self.session.post(teeUrl)
if response.status_code != 200:
return False
updated = re.findall('([\w]+ ?[\d]+, ?[\d]+)',response.text)[0]
# todayDate = datetime.datetime.today().strftime('%B %d, %Y')
last_updateed = await db.get_site_update("ignou")
if updated != last_updateed.get("tee",''):
return {"date" : updated,"updated" : True}
return {"date" : updated,"updated" : False}

View file

@ -1,24 +1,224 @@
from pyrogram import Client from collections import OrderedDict
from bot.config import Config from collections.abc import Mapping
from bot.helper.ignoucrawler import IgnouCrawler
import httpx
from bs4 import BeautifulSoup
from prettytable import PrettyTable
class DotDict(OrderedDict):
"""
Quick and dirty implementation of a dot-able dict, which allows access and
assignment via object properties rather than dict indexing.
"""
def __init__(self, *args, **kwargs):
# we could just call super(DotDict, self).__init__(*args, **kwargs)
# but that won't get us nested dotdict objects
od = OrderedDict(*args, **kwargs)
for key, val in od.items():
if isinstance(val, Mapping):
value = DotDict(val)
else:
value = val
self[key] = value
def __getattr__(self, k):
return self.get(k, "-")
__setattr__ = OrderedDict.__setitem__
class Ignou(Client): class ResultNotFoundError(Exception):
def __init__(self): pass
super().__init__(
session_name=Config.SESSION_NAME,
bot_token=Config.BOT_TOKEN, class IgnouResult:
api_id=Config.API_ID, def __init__(self) -> None:
api_hash=Config.API_HASH, transport = httpx.AsyncHTTPTransport(verify=False)
plugins=dict(root="bot/plugins"), self.session: httpx.AsyncClient = httpx.AsyncClient(transport=transport)
async def get_grade_result(self, program_id: str, enrollment_no: int):
program_id = program_id.upper()
params = {
"prog": program_id,
"eno": enrollment_no,
}
if program_id in [
"BCA",
"MCA",
"MP",
"MBP",
"PGDHRM",
"PGDFM",
"PGDOM",
"PGDMM",
"PGDFMP",
]:
params["type"] = 1
elif program_id in ["ASSSO", "BA", "BCOM", "BDP", "BSC"]:
params["type"] = 2
elif program_id in [
"BAEGH",
"BAG",
"BAHDH",
"BAHIH",
"BAPAH",
"BAPCH",
"BAPSH",
"BASOH",
"BAVTM",
"BCOMG",
"BSCANH",
"BSCBCH",
"BSCG",
"BSWG",
]:
params["type"] = 4
else:
params["type"] = 3
grade_card_page = "https://gradecard.ignou.ac.in/gradecard/view_gradecard.aspx"
response = await self.session.get(grade_card_page, params=params)
soup = BeautifulSoup(response.text, "lxml")
name = soup.find(id="ctl00_ContentPlaceHolder1_lblDispname").string
try:
trs = soup.find(string="COURSE").findParent("table")
except AttributeError:
raise ResultNotFoundError
trs = soup.find(string="COURSE").findParent("table").find_all("tr")
data = DotDict()
data.enrollment_no = enrollment_no
data.program_id = program_id
data.name = name
courses = []
data.courses = courses
total = DotDict()
data.total = total
total.total_passed = 0
total.total_failed = 0
total.total_marks = 0
total.total_obtained_marks = 0
column_index = {}
# get index of coulmn by name
cols_list = [data.get_text() for data in trs[0].find_all("th")]
for index, col in enumerate(cols_list):
if "COURSE" in col:
column_index["COURSE"] = index
elif "ASG" in col.upper():
column_index["ASIGN"] = index
elif "THEORY" in col:
column_index["THEORY"] = index
elif "PRACTICAL" in col:
column_index["PRACTICAL"] = index
elif "STATUS" in col:
column_index["STATUS"] = index
for tr in trs[1:-1]:
course = DotDict()
course.max_marks = 100
total.total_marks += course.max_marks
td = tr.find_all("td")
# course name
course.name = td[column_index["COURSE"]].string.strip()
# assignments marks
if assign := td[column_index["ASIGN"]].string.strip():
if assign.isnumeric():
course.assignment_marks = int(assign)
total.total_obtained_marks += course.assignment_marks
# theory marks
try:
if theory := td[column_index["THEORY"]].string.strip():
if theory.isnumeric():
course.theory_marks = int(theory)
total.total_obtained_marks += course.theory_marks
except Exception:
course.theory_marks = "-"
# lab marks
try:
if lab := td[column_index["PRACTICAL"]].string.strip():
if lab.isnumeric():
course.lab_marks = int(lab)
total.total_obtained_marks += course.lab_marks
except Exception:
course.lab_marks = "-"
# Status # ✅ ❌
if "NOT" not in td[column_index["STATUS"]].string.strip():
course.status = True
total.total_passed += 1
else:
course.status = False
total.total_failed += 1
courses.append(course)
total.total_courses = len(courses)
return {
"name": name,
"enrollment_no": enrollment_no,
"program_id": program_id,
"courses": courses,
**total,
}
async def gradeResultData(
self,
program_id: str,
enrollment_no: int,
):
data = DotDict(await self.get_grade_result(program_id, enrollment_no))
x = PrettyTable()
x.padding_width = 0
x.field_names = ["Course", "Asign", "Lab", "Term", "Status"]
header = "Name : {}\nProg : {} [{}]\n".format(
data.name, data.program_id, data.enrollment_no
) )
self.IgnouCrawler = IgnouCrawler(self) for course in data.courses:
tick = "" if course.status else ""
async def grade_crawler(self): x.add_row(
await self.IgnouCrawler.gradeTask() [
course.name,
course.assignment_marks,
course.lab_marks,
course.theory_marks,
tick,
]
)
async def tee_crawler(self): x.add_row(
await self.IgnouCrawler.teeTask() [
"Total",
"T:{}".format(data.total_courses),
"-", # data.total_obtained_marks,
"-", # data.total_marks,
f"[{data.total_passed}/{data.total_failed}]",
]
)
return {"header": header, "table": x.get_string()}

View file

@ -1,33 +0,0 @@
from pyrogram import Client
from pyrogram import filters
from bot.helper.ignoubooks import IgnouBooks
from bot.config import Config
# Completed Add DB Support
@Client.on_message(filters.command(['book']) & filters.chat(Config.SUDO_CHAT))
def books(client: Client, message):
text = message.text
try:
course_code = text.split(" ")[1].upper()
subject_code = text.split(" ")[2].upper()
except IndexError:
message.reply_text('Wrong Course Name or Subject Code 😐')
return
if books.find_one(subject_code):
# add code for database books for sending from cache
pass
if subject_code.upper() == 'LIST':
message.reply_text(IgnouBooks(course=course_code).get_courseSubjectlist())
else:
message.reply_text("please wait.. sending books")
files = IgnouBooks(course=course_code, subject=subject_code).getDownload()
for file in files:
client.send_document(
message.
chat,
id,
file,
caption=f'Downloaded using {Config.USERNAME}')

View file

@ -1,272 +0,0 @@
from pyrogram import Client
from pyrogram import filters
from pyrogram.types import (
InlineKeyboardMarkup,
InlineKeyboardButton,
CallbackQuery,
Message
)
import datetime
from bot.database import Database
from bot.config import Config
from bot.helper.ignouresult import IgnouResult
from bot.helper.extractor import User
db = Database()
# Completed
@Client.on_message(filters.regex('^(m|M)y'))
@Client.on_message(filters.command(['my']))
async def my(_, message):
user: User = User(await db.get_user(message.from_user.id))
if 'my' == message.text.lower():
if user.myenrollment:
await result_card(_, message)
else:
await message.reply_text("Set Your Enrollment using \n my 197xx00xx22")
return
else:
student = message.text.split()
try:
await db.update(
db.user,
message.from_user.id,
{"$set": {"myenrollment": student[1] + student[2]}})
await message.reply_text("Data Saved Successfully ")
except IndexError:
await message.reply_text("First Set your Enrollment using \n my course_code enrollment")
# Completed
@Client.on_message(filters.regex("^\D+\d{8,10}") | filters.regex("^\d{8,10}"))
async def result_card(_, message: Message):
get_course = False
user: User = User(await db.get_user(message.from_user.id))
if message.text.isnumeric():
if not user.course:
await message.reply("You must check once result with Course code \n example : bca 197xx00xx22")
return
else:
get_course = user.course
# elif 'my' in message.text.lower() or 'last' in message.text.lower():
# user: User = await db.get_user(message.from_user.id)
result_query = ''
if get_course:
result_query = get_course + message.text
elif 'my' in message.text.lower():
result_query = user.myenrollment
elif "last" in message.text.lower():
result_query = user.course + user.enrollment
else:
result_query = message.text
student: IgnouResult = IgnouResult(result_query)
result = student.gradeResultString()
if not result:
await message.reply_text('Enrollment no is not correct or \nGrade Card Site is Down 🤕')
return
if not user.following.get(student.enrollmentNo):
inline_keyboard = InlineKeyboardMarkup(
[
[ # First row
InlineKeyboardButton( # Generates a callback query when pressed
"Add to Watch List 👀",
callback_data=f"add_{student.courseId}_{student.enrollmentNo}"
),
],
[
InlineKeyboardButton(
"Share this 🤖",
switch_inline_query=f"\nTry this Easy IGNOU Bot\n 👉🏻 {Config.USERNAME} \n\n Created by @r0sh7n"
)
]
]
)
else:
inline_keyboard = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"Share this 🤖",
switch_inline_query=f"\nTry this Easy IGNOU Bot\n 👉🏻 {Config.USERNAME} \n Created by @r0sh7n"
)
]
]
)
await message.reply_text(
result.get("result", '') + Config.FOOTER_CREDIT,
parse_mode='html',
reply_markup=inline_keyboard)
await db.update_last_action(
message.from_user.id,
{"course": student.courseId, "enrollment": student.enrollmentNo})
# Tee Result Card
result: IgnouResult = IgnouResult(result_query).teeResultString()
if result:
await message.reply_text(
result.get("result", '') + Config.FOOTER_CREDIT,
parse_mode='html')
@Client.on_callback_query(filters.regex("^add") | filters.regex("^remove"))
async def watch_list(_, callback_query: CallbackQuery):
"""
callback_query example:
add_bca_192112313
remove_bca_1092313
"""
_, course, enrollment = callback_query.data.split("_")
today_date = datetime.datetime.today().strftime('%B %d, %Y')
# info about follower who is following enrollment
user_dict = {
"username": callback_query.from_user.username,
"name": callback_query.from_user.first_name,
"_id": callback_query.from_user.id,
"added_on": datetime.date.today().isoformat()
}
# add enrollment in following list
if 'add' in callback_query.data:
# Grade Card
student = IgnouResult(course + enrollment)
result = student.gradeResultJson()
# if unable to fetch result from ignou site
if result.get("status") != "ok":
await callback_query.answer("Unable to fetch details\nTry after sometime ", show_alert=True)
return
# student data
student_info = result.get("student", {})
# result pass fail in dict(passed,failed)
count = result.get("count", {})
# student info in user following section
await db.update(
db.user,
callback_query.from_user.id,
{
"$set": {
f"following.{enrollment}": {
"name": student_info.get("name"),
"course": student_info.get("course")
}
}
})
# this dict gonna update in crawler db
info_dict = {
"_id": enrollment
}
# check if student is already in crawler db
if not await db.find(
db.crawler,
enrollment
):
# collection info then update in info_dict : Dict variable
student_db = {
"name": student_info.get("name", ""),
"course": course,
"grade": {
"count": {
"passed": count.get("passed", 0),
"failed": count.get("failed", 0)
},
"checked": today_date
},
}
# updating info_dict : Dict with student_db
info_dict.update(student_db)
# Tee Card Json
result = student.teeResultJson()
# list of out result in tee
count = result.get("count", 0)
# this for tee section in crawler
student_db = {
"tee": {
"count": count or 0,
"checked": today_date
},
}
# updating again info_dict with new tee student_db
info_dict.update(student_db)
# adding follower info in info_dict
info_dict.update(
{
"followers": {
str(callback_query.from_user.id): user_dict
}
}
)
# inserting first time student in crawlre db
await db.insert(
db.crawler,
info_dict
)
# if student is already in crawler db then
# just update following and followers secion
# in user db in crawler db
else:
await db.update(
db.crawler,
enrollment,
{
"$set": {
f"followers.{callback_query.from_user.id}": user_dict}
})
# remove inline_button from result
await callback_query.edit_message_reply_markup()
# push notification to user
await callback_query.answer(
f"{student_info.get('name')} Added in Watch List 🙃\n😇You will automatically receive result once it published in IGNOU 🌐",
show_alert=True)
# this conditon for remove followed user
# from crawler followers section and in user following section
elif "remove" in callback_query.data:
await db.update(
db.user,
callback_query.from_user.id,
{
"$unset": {
f"following.{enrollment}": ""
}
})
await db.update(
db.crawler,
enrollment,
{
"$unset": {
f"followers.{callback_query.from_user.id}": ""
}
}
)
await callback_query.answer("User Removed from Watchlist 👀")

View file

@ -1,141 +0,0 @@
import re
from pyrogram import Client
from pyrogram import filters
from pyrogram.types import (
InlineKeyboardButton,
InlineKeyboardMarkup,
CallbackQuery,
Message
)
from bot.database import Database
from bot.helper.extractor import User, Student
from bot.config import Config
from bot.plugins.result import result_card
db = Database()
@Client.on_message(filters.command(['start', 'help']))
async def start(_, message):
if 'start' in message.text.lower():
if not await db.is_user_exist(message.from_user.id):
await db.add_user(message.from_user.id, message.from_user.first_name)
await message.reply_text(f"Welcome , {message.from_user.first_name} 🥳 \n \
[Click here for more details 🔍]({Config.HELP_URL})", disable_web_page_preview=True)
elif 'help' in message.text.lower():
await message.reply_text(f"{Config.HELP_URL}")
await db.update_last_used_on(message.from_user.id)
@Client.on_message(filters.command(["stats"]))
async def stats(client: Client,message : Message):
user: User = User(await db.get_user(message.from_user.id))
total_user = await db.total_users_count()
total_crawler = await db.total_crawlers_count()
total_following = len(user.following)
msg = f"""
Hi, {message.from_user.first_name}
Your Stat 🙃
TG 🆔 : {message.from_user.id}
Following 🕵 : {total_following}
"""
if user.is_admin or message.from_user.id in Config.SUDO_ADMIN:
try:
user_enrollment_not = re.findall("\d+",user.myenrollment)[0]
user_crawler_info: Student = Student(await db.get_student(user_enrollment_not))
msg += f" Followers 👼 : {len(user_crawler_info.followers)}\n"
except (IndexError, AttributeError, TypeError):
pass
site_info = await db.get_site_update("ignou")
grade_checked = site_info.get("grade_checked","error in monitoring")
tee_checked = site_info.get("tee_checked","error in monitoring ")
msg += f"""
{Config.USERNAME} Stat 🤖
Total User 🙆: {total_user}
Result Monitoring 😎: {total_crawler}
👀 Last Grade Card Checked
🕗 -> {grade_checked}
Last Tee Result Check
🕗 -> {tee_checked}
"""
await message.reply_text(msg)
@Client.on_callback_query(filters.regex("^user"))
async def user_info(_, callback_query: CallbackQuery):
_, enrollment = callback_query.data.split("_")
user: User = User(await db.get_user(callback_query.from_user.id))
student: Student = Student(await db.find(
db.crawler,
enrollment,
{"_id" : 0}
))
followed_by = len(student.followers)
msg_string = f"""👩🏻‍🎓 {student.name}
🆔 {enrollment} ({student.course})
Grade Card : {student.grade.passed+student.grade.failed} {student.grade.passed} {student.grade.failed}
Grade Card Updated on {student.grade.checked}
"""
if user.is_admin or callback_query.from_user.id in Config.SUDO_ADMIN:
msg_string += f"Followed by {followed_by} 👀"
await callback_query.answer(msg_string, show_alert=True)
@Client.on_message(filters.command(['watchlist']))
async def followed_list(_, message: Message):
user: User = User(await db.get_user(message.from_user.id))
if len(user.following) == 0:
await message.reply_text("Not followed anyone")
return
buttons = []
for enrollment, usr in user.following.items():
row = [
InlineKeyboardButton(
usr.get("name").split()[0],
callback_data=f"user_{enrollment}"
),
InlineKeyboardButton(
"🗑",
callback_data=f"remove_{usr.get('course')}_{enrollment}"
),
]
buttons.append(
row
)
await message.reply_text(
"<b>👩🏻‍🎓 Users in 👀 Watchlist</b>",
reply_markup=InlineKeyboardMarkup(buttons),
parse_mode="html")
@Client.on_message(filters.command(['last']))
async def last_result_check(_, message):
user: User = User(await db.get_user(message.from_user.id))
if user.enrollment:
await result_card(_, message)
else:
await message.reply_text("No recent result checked")

44
flake.lock Normal file
View file

@ -0,0 +1,44 @@
{
"nodes": {
"flake-utils": {
"locked": {
"lastModified": 1652776076,
"narHash": "sha256-gzTw/v1vj4dOVbpBSJX4J0DwUR6LIyXo7/SuuTJp1kM=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "04c1b180862888302ddfb2e3ad9eaa63afc60cf8",
"type": "github"
},
"original": {
"owner": "numtide",
"ref": "v1.0.0",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1675757258,
"narHash": "sha256-pIRer8vdsoherlRKpzfnHbMZ5TsAcvRlXHCIaHkIUbg=",
"owner": "NixOs",
"repo": "nixpkgs",
"rev": "af96094e9b8eb162d70a84fa3b39f4b7a8b264d2",
"type": "github"
},
"original": {
"owner": "NixOs",
"ref": "nixos-22.11",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

53
flake.nix Normal file
View file

@ -0,0 +1,53 @@
{
description = "IGNOU Telegram Bot v2.0";
inputs.nixpkgs.url = "github:NixOs/nixpkgs/nixos-22.11";
inputs.flake-utils.url = "github:numtide/flake-utils/v1.0.0";
outputs = inputs: (inputs.flake-utils.lib.eachDefaultSystem ( system :
let
pname = "ignou";
version = "2.0";
pkgs = inputs.nixpkgs.legacyPackages.${system};
pyEnv = pkgs.python3.withPackages (p: with p; [
pyrogram
tgcrypto
httpx
prettytable
beautifulsoup4
lxml
]);
ignouScript = pkgs.writeShellScriptBin "start-bot" ''
cd ${ignou}
${pyEnv}/bin/python3 -m bot'';
ignou = pkgs.stdenv.mkDerivation {
pname = "ignou-telegram-bot";
version = "2.0";
runtimeDependencies = [ pyEnv ];
src = ./.;
installPhase = ''
mkdir -p $out/bot
cp -r bot/* $out/bot/
'';
};
in rec {
packages.default = pkgs.buildEnv {
name = "${pname}-${version}";
paths = [ ignou ignouScript ];
};
devShell = pkgs.mkShell {
buildInputs = [ pyEnv ];
};
})) // {
nixosModules.default = import ./nix/module.nix inputs;
};
}

50
nix/module.nix Normal file
View file

@ -0,0 +1,50 @@
inputs: {
config,
lib,
pkgs,
...
}:
let
cfg = config.services.ignou;
ignou = inputs.self.packages.${pkgs.stdenv.hostPlatform.system}.default;
in
with lib;
{
options = {
services.ignou = {
enable = mkEnableOption "Enable the IGNOU Bot service";
BOT_TOKEN = mkOption {
type = types.str;
description = "Telegram Bot Token";
};
};
};
config = mkIf cfg.enable {
systemd.services.ignou = {
description = "IGNOU Telegram Bot";
after = [ "network.target" ];
wantedBy = [ "multi-user.target" ];
serviceConfig = {
Type = "oneshot";
ExecStart = "${ignou}/bin/start-bot";
};
environment = {
BOT_TOKEN = cfg.BOT_TOKEN;
};
};
};
}

View file

@ -1,35 +0,0 @@
aiohttp==3.8.4
aiosignal==1.3.1
APScheduler==3.10.0
async-timeout==4.0.2
asyncio==3.4.3
attrs==22.2.0
beautifulsoup4==4.11.2
certifi==2022.12.7
cffi==1.15.1
charset-normalizer==3.0.1
cryptography==39.0.1
dnspython==2.3.0
frozenlist==1.3.3
idna==3.4
lxml==4.9.2
motor==3.1.1
multidict==6.0.4
prettytable==3.6.0
pyaes==1.6.1
pycparser==2.21
pymongo==4.3.3
pyOpenSSL==23.0.0
Pyrogram==2.0.99
PySocks==1.7.1
pytz==2022.7.1
pytz-deprecation-shim==0.1.0.post0
requests==2.28.2
six==1.16.0
soupsieve==2.4
TgCrypto==1.2.5
tzdata==2022.7
tzlocal==4.2
urllib3==1.26.14
wcwidth==0.2.6
yarl==1.8.2

View file

@ -1 +0,0 @@
python-3.9.2