code refactoring

This commit is contained in:
Milka64 2023-12-26 13:19:46 +01:00
parent 5792a88911
commit c47df4b78c
30 changed files with 804 additions and 1209 deletions

2
.idea/.gitignore vendored
View file

@ -1,2 +0,0 @@
# Default ignored files
/workspace.xml

View file

@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.7 (kivy_tuto)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View file

@ -1,4 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (kivy_tuto)" project-jdk-type="Python SDK" />
</project>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/kabot.iml" filepath="$PROJECT_DIR$/.idea/kabot.iml" />
</modules>
</component>
</project>

View file

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View file

@ -1,14 +0,0 @@
FROM alpine
WORKDIR /app
VOLUME /data
RUN apk add --update musl-dev python3-dev gcc python3 py3-pip git py3-lxml libxml2 libxml2-dev py3-pynacl ffmpeg\
&& pip3 install --upgrade pip \
&& pip3 install bs4 requests giphy_client discord.py python-gitlab\
&& git clone https://git.0w.tf/Milka64/kabot.git
WORKDIR /app/kabot
RUN pip3 install kabot/
RUN mkdir -p /var/log/kabot/
RUN chmod 777 /var/log/kabot
CMD kabot -c /data/config.ini

View file

@ -1,210 +0,0 @@
##############################################################################
#
# Copyright (c) 2006 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Bootstrap a buildout-based project
Simply run this script in a directory containing a buildout.cfg.
The script accepts buildout command-line options, so you can
use the -c option to specify an alternate configuration file.
"""
import os
import shutil
import sys
import tempfile
from optparse import OptionParser
__version__ = '2015-07-01'
# See zc.buildout's changelog if this version is up to date.
tmpeggs = tempfile.mkdtemp(prefix='bootstrap-')
usage = '''\
[DESIRED PYTHON FOR BUILDOUT] bootstrap.py [options]
Bootstraps a buildout-based project.
Simply run this script in a directory containing a buildout.cfg, using the
Python that you want bin/buildout to use.
Note that by using --find-links to point to local resources, you can keep
this script from going over the network.
'''
parser = OptionParser(usage=usage)
parser.add_option("--version",
action="store_true", default=False,
help=("Return bootstrap.py version."))
parser.add_option("-t", "--accept-buildout-test-releases",
dest='accept_buildout_test_releases',
action="store_true", default=False,
help=("Normally, if you do not specify a --version, the "
"bootstrap script and buildout gets the newest "
"*final* versions of zc.buildout and its recipes and "
"extensions for you. If you use this flag, "
"bootstrap and buildout will get the newest releases "
"even if they are alphas or betas."))
parser.add_option("-c", "--config-file",
help=("Specify the path to the buildout configuration "
"file to be used."))
parser.add_option("-f", "--find-links",
help=("Specify a URL to search for buildout releases"))
parser.add_option("--allow-site-packages",
action="store_true", default=False,
help=("Let bootstrap.py use existing site packages"))
parser.add_option("--buildout-version",
help="Use a specific zc.buildout version")
parser.add_option("--setuptools-version",
help="Use a specific setuptools version")
parser.add_option("--setuptools-to-dir",
help=("Allow for re-use of existing directory of "
"setuptools versions"))
options, args = parser.parse_args()
if options.version:
print("bootstrap.py version %s" % __version__)
sys.exit(0)
######################################################################
# load/install setuptools
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
ez = {}
if os.path.exists('ez_setup.py'):
exec(open('ez_setup.py').read(), ez)
else:
exec(urlopen('https://bootstrap.pypa.io/ez_setup.py').read(), ez)
if not options.allow_site_packages:
# ez_setup imports site, which adds site packages
# this will remove them from the path to ensure that incompatible versions
# of setuptools are not in the path
import site
# inside a virtualenv, there is no 'getsitepackages'.
# We can't remove these reliably
if hasattr(site, 'getsitepackages'):
for sitepackage_path in site.getsitepackages():
# Strip all site-packages directories from sys.path that
# are not sys.prefix; this is because on Windows
# sys.prefix is a site-package directory.
if sitepackage_path != sys.prefix:
sys.path[:] = [x for x in sys.path
if sitepackage_path not in x]
setup_args = dict(to_dir=tmpeggs, download_delay=0)
if options.setuptools_version is not None:
setup_args['version'] = options.setuptools_version
if options.setuptools_to_dir is not None:
setup_args['to_dir'] = options.setuptools_to_dir
ez['use_setuptools'](**setup_args)
import setuptools
import pkg_resources
# This does not (always?) update the default working set. We will
# do it.
for path in sys.path:
if path not in pkg_resources.working_set.entries:
pkg_resources.working_set.add_entry(path)
######################################################################
# Install buildout
ws = pkg_resources.working_set
setuptools_path = ws.find(
pkg_resources.Requirement.parse('setuptools')).location
# Fix sys.path here as easy_install.pth added before PYTHONPATH
cmd = [sys.executable, '-c',
'import sys; sys.path[0:0] = [%r]; ' % setuptools_path +
'from setuptools.command.easy_install import main; main()',
'-mZqNxd', tmpeggs]
find_links = os.environ.get(
'bootstrap-testing-find-links',
options.find_links or
('http://downloads.buildout.org/'
if options.accept_buildout_test_releases else None)
)
if find_links:
cmd.extend(['-f', find_links])
requirement = 'zc.buildout'
version = options.buildout_version
if version is None and not options.accept_buildout_test_releases:
# Figure out the most recent final version of zc.buildout.
import setuptools.package_index
_final_parts = '*final-', '*final'
def _final_version(parsed_version):
try:
return not parsed_version.is_prerelease
except AttributeError:
# Older setuptools
for part in parsed_version:
if (part[:1] == '*') and (part not in _final_parts):
return False
return True
index = setuptools.package_index.PackageIndex(
search_path=[setuptools_path])
if find_links:
index.add_find_links((find_links,))
req = pkg_resources.Requirement.parse(requirement)
if index.obtain(req) is not None:
best = []
bestv = None
for dist in index[req.project_name]:
distv = dist.parsed_version
if _final_version(distv):
if bestv is None or distv > bestv:
best = [dist]
bestv = distv
elif distv == bestv:
best.append(dist)
if best:
best.sort()
version = best[-1].version
if version:
requirement = '=='.join((requirement, version))
cmd.append(requirement)
import subprocess
if subprocess.call(cmd) != 0:
raise Exception(
"Failed to execute command:\n%s" % repr(cmd)[1:-1])
######################################################################
# Import and run buildout
ws.add_entry(tmpeggs)
ws.require(requirement)
import zc.buildout.buildout
if not [a for a in args if '=' not in a]:
args.append('bootstrap')
# if -c was provided, we push it back into args for buildout' main function
if options.config_file is not None:
args[0:0] = ['-c', options.config_file]
zc.buildout.buildout.main(args)
shutil.rmtree(tmpeggs)

View file

@ -1,45 +0,0 @@
[buildout]
show-picked-versions = true
extends = pinned.cfg
package = kabot
develop =
kabot
mr-dev/discord
mr-dev/giphy
parts +=
kabot
doc
extensions = mr.developer
sources-dir = mr-dev
auto-checkout =
discord
giphy
[kabot]
recipe = zc.recipe.egg
eggs =
${buildout:package}
cookiecutter
discord.py
PyNaCl
requests
giphy_client
python-gitlab
interpreter = python
[doc]
recipe = zc.recipe.egg
eggs =
${buildout:package}
[sources]
discord = git https://github.com/Rapptz/discord.py.git
giphy = git https://github.com/Giphy/giphy-python-client.git
[logging]
logger_root_level = DEBUG

View file

@ -1,15 +0,0 @@
* Kabot version:
* Python version:
* Operating System:
### Description
Describe what you were trying to get done.
Tell us what happened, what went wrong, and what you expected to happen.
### What I Did
```
Paste the command(s) you ran and the output.
If there was a crash, please include the traceback here.
```

View file

@ -1,88 +0,0 @@
.PHONY: clean clean-test clean-pyc clean-build docs help
.DEFAULT_GOAL := help
define BROWSER_PYSCRIPT
import os, webbrowser, sys
try:
from urllib import pathname2url
except:
from urllib.request import pathname2url
webbrowser.open("file://" + pathname2url(os.path.abspath(sys.argv[1])))
endef
export BROWSER_PYSCRIPT
define PRINT_HELP_PYSCRIPT
import re, sys
for line in sys.stdin:
match = re.match(r'^([a-zA-Z_-]+):.*?## (.*)$$', line)
if match:
target, help = match.groups()
print("%-20s %s" % (target, help))
endef
export PRINT_HELP_PYSCRIPT
BROWSER := python -c "$$BROWSER_PYSCRIPT"
help:
@python -c "$$PRINT_HELP_PYSCRIPT" < $(MAKEFILE_LIST)
clean: clean-build clean-pyc clean-test ## remove all build, test, coverage and Python artifacts
clean-build: ## remove build artifacts
rm -fr build/
rm -fr dist/
rm -fr .eggs/
find . -name '*.egg-info' -exec rm -fr {} +
find . -name '*.egg' -exec rm -f {} +
clean-pyc: ## remove Python file artifacts
find . -name '*.pyc' -exec rm -f {} +
find . -name '*.pyo' -exec rm -f {} +
find . -name '*~' -exec rm -f {} +
find . -name '__pycache__' -exec rm -fr {} +
clean-test: ## remove test and coverage artifacts
rm -fr .tox/
rm -f .coverage
rm -fr htmlcov/
rm -fr .pytest_cache
lint: ## check style with flake8
flake8 kabot tests
test: ## run tests quickly with the default Python
python setup.py test
test-all: ## run tests on every Python version with tox
tox
coverage: ## check code coverage quickly with the default Python
coverage run --source kabot setup.py test
coverage report -m
coverage html
$(BROWSER) htmlcov/index.html
docs: ## generate Sphinx HTML documentation, including API docs
rm -f docs/kabot.rst
rm -f docs/modules.rst
sphinx-apidoc -o docs/ kabot
$(MAKE) -C docs clean
$(MAKE) -C docs html
$(BROWSER) docs/_build/html/index.html
servedocs: docs ## compile the docs watching for changes
watchmedo shell-command -p '*.rst' -c '$(MAKE) -C docs html' -R -D .
release: dist ## package and upload a release
twine upload dist/*
dist: clean ## builds source and wheel package
python setup.py sdist
python setup.py bdist_wheel
ls -l dist
install: clean ## install the package to the active Python's site-packages
python setup.py install

View file

@ -1,612 +1,40 @@
# -*- coding: utf-8 -*-
# This example requires the 'message_content' privileged intent to function.
"""Main module."""
from __future__ import unicode_literals
import aiocron
import asyncio
import discord
import giphy_client
import gitlab
import logging
import lxml
import os
import random
import requests
import youtube_dl
import configparser
import argparse
import typing
import functools
import yt_dlp as youtube_dl
from bs4 import BeautifulSoup as bs
from discord.ext import tasks, commands
from giphy_client.rest import ApiException
from logging.handlers import RotatingFileHandler
from pathlib import Path
from subprocess import *
from sys import argv,exit, exc_info
here = os.path.dirname(os.path.abspath(__file__))
## création de l'objet logger
logger = logging.getLogger()
## definition du log level
logger.setLevel(logging.INFO)
## format du log
formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s')
## definition de l'affichage des logs dans la console
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
logger.addHandler(stream_handler)
from discord.ext import commands
from utils.audio import *
from utils.texte import *
ffmpeg_options = {
'options': '-vn -loglevel debug',
}
#class Mybot(commands.Cog):
class Mybot(discord.Client):
#Fonctions necesaires pour Kabot.
def __init__(
self,
bot,
intents=None,
gl_url=None,
gl_token=None,
gif_token=None,
audio_path=None,
nickname=None,
voice_channel=None,
text_channel=None,
):
self.gl_url = gl_url
self.audio_path = audio_path
self.gl_token = gl_token
self.gif_token = gif_token
self.voice_channel = voice_channel
self.text_channel = text_channel
self.bot = bot
self.sounds = []
self.nickname = nickname
self.sounds_history = []
#self.play_next.start()
intents = discord.Intents.all()
intents.message_content = True
async def setup_hook(self) -> None:
# create the background task and run it in the background
self.bg_task = self.loop.create_task(self.play_next())
#@tasks.loop(seconds=1.5)
async def play_next(self):
if self.sounds:
audio_file, ctx = self.sounds[0]
print(type(audio_file))
source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(audio_file,**ffmpeg_options))
if not ctx.voice_client.is_playing() and ctx.voice_client.is_connected():
ctx.voice_client.play(source, after=lambda e: print('Player error: %s' % e) if e else None)
self.sounds.pop(0)
self.sounds_history.reverse()
self.sounds_history.append((audio_file, ctx))
self.sounds_history.reverse()
if len(self.sounds_history) > 5:
self.sounds_history = self.sounds_history[:5]
#Fin des fonctions necesaire pour Kabot.
#Les commandes pour interagir avec Kabot.
@commands.command(help='Clear sound queue')
async def clear(self, ctx):
self.sounds = []
@commands.command(help='list des commits')
async def commits(self, ctx, number = 5):
if self.gl_url and self.gl_token:
number = int(number)
gl = gitlab.Gitlab(self.gl_url, self.gl_token)
gl.auth()
projects = gl.projects.list(search='Kabot')[0]
commits = projects.commits.list(all=True)[:number]
for commit in commits:
detail = commit.attributes
await ctx.channel.send("__" + detail['author_name'] + "__: " + detail['title'] + '\n' + detail['web_url'])
else:
await ctx.channel.send("-_-")
bot = commands.Bot(
command_prefix=commands.when_mentioned_or("!"),
description='''A ROULEEEEETTES !!
HOULA... J'l'ai un peu trop gueulé ça, non ?
A roulettes.''',
intents=intents,
)
@commands.command(help='clear cache youtube dl')
async def cache_clear(self, ctx):
fichiers = os.listdir('/tmp')
for fichier in fichiers:
if fichier.startswith('discord'):
os.remove("/tmp/" + fichier)
@commands.command()
async def leave(self, ctx):
if ctx.voice_client and ctx.voice_client.is_connected():
await ctx.guild.voice_client.disconnect()
@commands.command(help="Interrogation issues \n Args: list, search[mot clé] et add[nom de l'issue]")
async def issue(self, ctx, *args):
if self.gl_url and self.gl_token:
if args:
args = list(args)
gl = gitlab.Gitlab(self.gl_url, self.gl_token)
gl.auth()
if args[0] == 'list':
projects = gl.projects.list(search='Kabot')[0]
issues = projects.issues.list()
for issue in issues:
if "closed" == issue.state:
pass
else:
await ctx.channel.send('#' + str(issue.id) + ": " + issue.title + '\n' + issue.web_url)
elif args[0] == 'search':
query = ''.join(args[1:])
project = gl.projects.list(search='Kabot')[0]
find_issues = project.search("issues", query)
for issue in find_issues:
await ctx.channel.send("#" + str(issue['id']) + ": " + issue['title'] + '\n' + issue['web_url'])
elif args[0] == 'add':
title = ' '.join(args[1:])
author = title + ' - By ' + ctx.message.author.name
projects = gl.projects.list()
for project in projects:
if "Kabot" == project.name:
issue = project.issues.create({'title': author})
logger.info("Issue created : %s" % issue.web_url)
else:
await ctx.channel.send('unknown command')
@commands.command()
async def join(self, ctx):
channel = [x for x in self.bot.get_all_channels() if x.name == self.voice_channel][0]
await channel.connect()
await asyncio.sleep(2)
@commands.command(help="detail du dernier son joué")
async def last(self, ctx, number = 1):
number = int(number)
for sound in self.sounds_history[0:number]:
await ctx.channel.send("```"+str(sound[0])+"```")
@commands.command(help='count lines numbers in quote file')
async def lines(self, ctx):
path = '/data/log/%s.log' % ctx.channel
with open(path, 'r') as f:
lines = f.read().splitlines()
nb_lines = len(lines)
with ctx.channel.typing():
await asyncio.sleep(0.5)
await ctx.channel.send("j'ai %s lignes dans mon stock" % nb_lines)
@commands.command(help='check if bot always online')
async def ping(self, message):
print('trying to repondre')
await message.channel.send('pong')
@commands.command(help='Restart Bot')
async def restart(self, ctx):
cmd = self.bot.get_command('leave')
await cmd.invoke(ctx)
await self.bot.close()
exit()
@commands.command(help='Update local repo')
async def update(self, message):
output = Popen('git pull'.split(), stdout=PIPE).communicate()[0]
cmd_audio = "git -C %s pull" % self.audio_path
output += Popen(cmd_audio.split(), stdout=PIPE).communicate()[0]
await message.channel.send(output.decode('utf-8'))
#Fin des commandes pour interagir avec Kabot.
#Les commandes pour faire mumuse avec Kabot.
@commands.command(help="randomsur l'avenir des gens.")
async def avenir(self, ctx):
list_mot = ("tu seras curé, tu t'occuperas plus spécialement du catéchisme. ",
"tu seras animateur de soirées pour les gays pride. ",
"tu seras gynecologue dans une maison de retraite.",
"tu iras vivre en thaïland à cause de ton job. car tu seras ladyboy dans un bar.",
"tu sera DSI chez jacky et Michel",
"tu seras arroseur de plante aquatique.")
choix = random.choice(list_mot)
with ctx.channel.typing():
await asyncio.sleep(len(choix) / 4)
await ctx.channel.send(choix)
@commands.command(help="j'adore la cuisine")
async def ciboulette(self, ctx):
cmd = self.bot.get_command('gif')
ctx.message.content = "!gif ciboulette"
await cmd.invoke(ctx)
@bot.event
async def on_ready():
print(f'Logged in as {bot.user} (ID: {bot.user.id})')
print('------')
@commands.command(help='Faire des choix')
async def choice(self, ctx, *words):
choices = random.choice(words)
await ctx.channel.send(choices)
async def main():
## création de l'objet logger
logger = logging.getLogger()
## definition du log level
logger.setLevel(logging.INFO)
@commands.command(help="optionnal args : ")
async def contrepeterie(self, ctx, *args):
response = None
path = here + '/ressources/contrepeteries.txt'
with open(path) as file:
lines = file.read().splitlines()
myline = random.choice(lines)
question, reponse = myline.split(";")
try:
response = '''Question ! : %s Réponse : ||%s||''' % (question, reponse)
except:
response = "Unknow error, try: !contrepeterie [mot clef]"
await ctx.send(response)
@commands.command(help='Gif me')
async def gif(self, ctx):
query = ctx.message.content.replace('!gif ', '')
api_instance = giphy_client.DefaultApi()
api_key = self.gif_token
lang = 'fr'
if api_key:
try:
api_response = api_instance.gifs_search_get(api_key, query, lang=lang, limit=15)
api_response.to_dict()['data'][0]
get_url = random.choice(api_response.to_dict()['data'])
get_url['url']
await ctx.channel.send(get_url['url'])
except ApiException as e:
await ctx.channel.send("Exception when calling DefaultApi->gifs_search_get: %s\n" % e)
else:
await ctx.channel.send("Exception : No api key found")
@commands.guild_only()
@commands.command()
async def joke(self, ctx, folder=None):
try:
user = ctx.message.author.name
if not folder or not ctx.message.content:
audio_file = random.choice([f"{f}" for f in Path(self.audio_path + '/').glob('**/*.mp3')])
else:
folder = folder.lower()
audio_file = [f"{f}" for f in Path(self.audio_path + '/').glob('**/*.mp3') if folder in str(f).lower()]
if not audio_file:
ctx.channel.send("%s not found" % folder)
else:
audio_file = random.choice(audio_file)
self.sounds.append((audio_file, ctx))
except:
e = exc_info()[0]
await ctx.channel.send("Exception when calling joke: %s\n" % e)
@commands.command(help="optionnal args : [livre] [character]")
async def kaamelott(self, ctx, *args):
response = None
url = 'https://kaamelott.chaudie.re/api/random'
if args and ctx.message.content:
args = list(args)
if args[0].isdigit():
livre = int(args[0])
args.pop(0)
elif args[-1].isdigit():
livre = int(args[-1])
args.pop(-1)
else:
livre = None
if args:
perso = ' '.join(args)
if perso and livre:
url = 'https://kaamelott.chaudie.re/api/random/livre/%s/personnage/%s' % (livre, perso)
elif perso:
url = 'https://kaamelott.chaudie.re/api/random/personnage/%s' % perso
else:
url = 'https://kaamelott.chaudie.re/api/random/livre/%s' % livre
try:
citation = requests.get(url).json()['citation']
response = "%s :\n```\n%s\n```" % (citation['infos']['personnage'], citation['citation'])
except:
response = "Unknow error, try: !kaamelott [livre] [character]"
await ctx.send(response)
@commands.command(help="Je menotte une cornemuse et je fume Eddy Malou")
async def kamoulox(self, ctx):
sans_verbe = get_word('nom').text + " " + get_word('complement').get('m') + " et " + get_word('nom').text + " " + get_word('complement').get('m') + "."
nom1 = get_word('nom')
nom2 = get_word('nom')
un1 = "un"
un2 = "un"
if nom1.get('gender') == 'F':
un1 = "une"
if nom2.get('gender') == 'F':
un2 = "une"
phrase1 = get_word('verbe').text + " " + un1 + " " + nom1.text + " " + random.choice([get_word('complement').get('m'), ""])
phrase2 = get_word('verbe').text + " " + un2 + " " + nom2.text + " " + random.choice([get_word('complement').get('m'), ""])
avec_verbe = phrase1 + " et " + phrase2 + "."
piece = random.choice(['pile', 'face'])
if piece == "pile":
result = sans_verbe
elif piece == "face":
result = avec_verbe
with ctx.channel.typing():
await asyncio.sleep(len(result)/6)
await ctx.send(result)
@commands.command(help="Toi tu fermes ta gueule! Tu la fermes définitivement")
async def mute(self, ctx, member: discord.Member=None, mute_time = 10):
if member.voice.mute:
with ctx.channel.typing():
await asyncio.sleep(1)
await ctx.channel.send("Tu ne vas pas m'avoir si facilement")
return
if not member:
with ctx.channel.typing():
await asyncio.sleep(1)
await ctx.channel.send("Qui veux-tu mute?")
return
if mute_time > 60:
with ctx.channel.typing():
await asyncio.sleep(1)
await ctx.channel.send("Doucement sur le temps!")
return
member = ctx.message.mentions[0]
await member.edit(mute=True)
with ctx.channel.typing():
await asyncio.sleep(1)
await ctx.send("Tu sors %s!" % member.mention)
await asyncio.sleep(mute_time)
await member.edit(mute=False)
with ctx.channel.typing():
await asyncio.sleep(1)
await ctx.channel.send("Tu peux reparler %s!" % member.mention)
@commands.command(help="Jouer un song a partir d'une video")
async def play(self, ctx, url):
ydl_opts = {'format': 'bestaudio/mp3', 'outtmpl': '/tmp/discord_%(title)s-%(id)s.%(ext)s',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(url, download=False)
out_file = ydl.prepare_filename(info_dict)
extension = out_file.split('.')[-1]
out_file = out_file.replace(extension, 'mp3')
if not os.path.isfile(out_file):
await asyncio.get_running_loop().run_in_executor(None, ydl.download,[url])
self.sounds.append((out_file, ctx))
@commands.command()
async def repeat(self, ctx):
# if ctx.message.author.name == self.bot.user.name:
# return
user = ctx.message.author.name
source = self.sounds_history[0]
self.sounds.append(source)
@commands.command(help="Appuie sur la detente PUSSY!")
async def roulette(self, ctx):
mute_time = 60
member = ctx.message.author
barillet = [False, False, True, False, False]
bullet = random.choice(barillet)
await self.joke(ctx, "spinning_and_closed.mp3")
if bullet == True:
await self.joke(ctx, "omawa_shindeiru.mp3")
await asyncio.sleep(2)
await member.edit(mute=True)
with ctx.channel.typing():
await asyncio.sleep(3)
await ctx.channel.send("Perdu, tu es mute pendant 60 secondes!")
else:
await self.joke(ctx, "hammer_and_dry.mp3")
with ctx.channel.typing():
await asyncio.sleep(3)
await ctx.channel.send("Gagné, tu ne seras pas mute!")
return
await asyncio.sleep(mute_time)
await member.edit(mute=False)
with ctx.channel.typing():
await asyncio.sleep(0.5)
await ctx.channel.send("Tu peux reparler %s!" % member.mention)
@commands.dm_only()
@commands.command(help="Faire dire des choses au bot")
async def say(self, ctx, *message):
sentence = ' '.join(message)
channel = [x for x in self.bot.get_all_channels() if x.name == self.text_channel][0]
guild = self.bot.guilds[0]
if sentence.startswith('!'):
command_name = sentence.split()[0].replace('!', '')
cmd = self.bot.get_command(command_name)
ctx.channel = channel
ctx.guild = guild
for word in sentence.split():
if word.startswith('@'):
members = guild.members
for member in members:
if member.name == word[1:]:
sentence = sentence.replace(word, member.mention)
ctx.message = sentence
await cmd.invoke(ctx)
else:
for word in sentence.split():
if word.startswith('@'):
members = guild.members
for member in members:
if member.name == word[1:]:
sentence = sentence.replace(word, member.mention)
await channel.send(sentence)
@commands.command(help='Who the fuck am i?')
async def schizo(self, ctx, *names):
name = ' '.join(names)
list_name = ["Kabot", "Gaspard et Balthazar", "Bender", "Zoidberg", "Gunther", "MissSaugnacEtCambran2022"]
try:
current_name = self.bot.user.name
list_name.remove(current_name)
except:
pass
if not name:
name = random.choice(list_name)
lower_names = [x.lower() for x in list_name]
if name.lower() in lower_names:
if name:
correct_name = [x for x in list_name if x.lower() in name.lower()][0]
name = correct_name
await self.bot.user.edit(username=name)
else:
name = "404"
await self.bot.user.edit(username="Error 404 name not found!")
await ctx.channel.send("Liste des noms = %s" % list_name)
img = open(here + "/ressources/avatar_bot/%s.jpg" % name, 'rb')
await self.bot.user.edit(avatar=img.read())
await self.joke(ctx, name)
@commands.command(help='slap this ass')
async def slap(self, ctx, user=None):
slap_multiple = [
"%s prend un coup de pied au cul",
"Descente du coude sur %s",
"%s est propulsé par dessus la TROISIEME CORDE!",
"Le mec en rose, c'est moi et le mec en jaune c'est %s! https://giphy.com/gifs/gSIz6gGLhguOY",
]
if not user or not ctx.message.mentions:
online_members = []
members = ctx.guild.members
for member in members:
if str(member.status) == "online":
online_members.append(member)
user = random.choice(online_members)
user = user.mention
elif ctx.message.mentions:
user = ctx.message.mentions[0]
user = user.mention
if user == self.bot.user.mention:
with ctx.channel.typing():
await asyncio.sleep(0.5)
await ctx.channel.send("je tribuche par terre et je sais pas comment")
else:
with ctx.channel.typing():
await asyncio.sleep(len(slap_multiple) / 4)
await ctx.channel.send(random.choice(slap_multiple) % user)
@commands.command()
async def welcome(self, ctx):
# if ctx.message.author.name == self.bot.user.name:
# return
user = ctx.message.author.name
print(user)
try:
audio_file = random.choice([f for f in os.listdir(self.audio_path + '/%s/' % user) if f.endswith('.mp3')])
audio_file = self.audio_path + '/%s/' % user + audio_file
except:
audio_file = random.choice([f"{f}" for f in Path(self.audio_path + '/').glob('**/*.mp3')])
self.sounds.append((audio_file, ctx))
#Fin des commandes pour faire mumuse avec Kabot.
#Commandes pour troll.
@commands.command(help='Troll commands', hidden=True)
async def jke(self, ctx):
await ctx.channel.send(trollpower())
@commands.command(help='Troll commands', hidden=True)
async def joe(self, ctx):
await ctx.channel.send(trollpower())
@commands.command(help='Troll commands', hidden=True)
async def jok(self, ctx):
await ctx.channel.send(trollpower())
@commands.command(help='Troll commands', hidden=True)
async def joker(self, ctx):
await ctx.channel.send(trollpower(too_long=True))
@commands.command(help='Troll commands', hidden=True)
async def oke(self, ctx):
await ctx.channel.send(trollpower())
@commands.command(help='Troll commands', hidden=True)
async def okre(self, ctx):
await self.joke(ctx, "tu_dois_tout_donner.mp3")
#Fin des commandes pour troll.
def trollpower(too_long=None):
if too_long:
return("Bah alors, on sait plus écrire, je te donne un indice: t'as une lettre en trop! :sweat_drops: :tongue:")
return('Bah alors, on sait plus écrire, je te donne un indice: il te manque une lettre! :sweat_drops: :tongue:')
def get_word(word_type):
"""Chercher les mots pour la fonction kamoulox dans le fichier xml"""
content = []
with open(here + "/ressources/base_kml.xml", "r", encoding="ISO-8859-1") as file:
content = file.readlines()
content = "".join(content)
bs_content = bs(content, 'lxml')
if word_type == 'nom':
nom = bs_content.resources.nom.find_all('word')
result = random.choice(nom)
return result
elif word_type == 'nom_propre':
nom_propre = bs_content.resources.nompropre.find_all('word')
result = random.choice(nom_propre)
return result
elif word_type == 'verbe':
verbe = bs_content.resources.verbe.find_all('word')
result = random.choice(verbe)
return result
elif word_type == 'complement':
complement = bs_content.resources.complement.find_all('word')
result = random.choice(complement)
return result
elif word_type == 'nom_special':
nom_special = bs_content.resources.complement.find_all('word')
result = random.choice(nom_special)
return result
elif word_type == 'prenom':
prenom = bs_content.resources.prenom.find_all('word')
result = random.choice(prenom)
return result
elif word_type == 'prescuse':
prescuse = bs_content.resources.prescuse.find_all('word')
result = random.choice(prescuse)
return result
elif word_type == 'scuse1':
scuse1 = bs_content.resources.scuse1.find_all('word')
result = random.choice(scuse1)
return result
elif word_type == 'scuse2':
scuse2 = bs_content.resources.scuse2.find_all('word')
result = random.choice(scuse2)
return result
elif word_type == 'presulte':
presulte = bs_content.resources.presulte.find_all('word')
result = random.choice(presulte)
return result
elif word_type == 'sulte':
sulte = bs_content.resources.sulte.find_all('word')
result = random.choice(sulte)
return result
elif word_type == 'postsulte':
postsulte = bs_content.resources.presulte.find_all('word')
result = random.choice(postsulte)
return result
elif word_type == 'mail':
mail = bs_content.resources.mail.find_all('word')
result = random.choice(mail)
return result
else:
result = 'Nique bien ta mère!'
return result
#Le do[main]e de Kabot.
def main():
intents = discord.Intents.all()
intents.members = True
intents.messages = True
logger.info("Initialisation de Kabot")
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
@ -623,6 +51,7 @@ def main():
log_file = config['DEFAULT']['logs']
## definition du fichier de log (chemin, level, etc ...)
formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s')
file_handler = RotatingFileHandler(log_file, 'a', 1000000, 1)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
@ -645,145 +74,25 @@ def main():
gl_url = config['gitlab']['url']
gl_token = config['gitlab']['token']
gif_token = config['giphy']['token']
bot = commands.Bot(
command_prefix='!',
intents=intents,
audio_path="/home/mika/kabot-audio",
)
bot.add_cog(
Mybot(
bot,
intents = intents,
gl_url=gl_url,
gl_token=gl_token,
gif_token=gif_token,
audio_path="/home/mika/kabot-audio",
nickname=nickname,
voice_channel=voice_channel,
text_channel=text_channel,
async with bot:
await bot.add_cog(
Audio(
bot,
voice_chan=voice_channel,
text_chan=text_channel,
audio_path='/home/mika/kabot_audio/'
)
)
)
test = False
if "test" in argv[-1]:
test=True
@aiocron.crontab('50 9-23/1 * * *')
async def kron():
kron_choice = random.choice(["kaamelott", "slap", "kamoulox", "contrepeterie", "schizo", False, False, False, False, False, False])
if kron_choice:
await asyncio.sleep(random.choice(range(3550)))
for channel in bot.get_all_channels():
if channel.name == text_channel:
current_chan = channel
ctx = await bot.get_context(current_chan.last_message)
ctx.message.content = ""
print(ctx.message.content)
random_cmd = bot.get_command(kron_choice)
await random_cmd.invoke(ctx)
@bot.event
async def on_message(message):
print(message.content)
if message.author == bot.user:
return
else:
print(message.content)
if bot.user in message.mentions \
and len(message.mentions) < 3 \
and len(message.content.splitlines()) == 1:
path = '/data/log/%s.log' % message.channel
with open(path, 'r') as f:
lines = f.read().splitlines()
if not message.content in lines:
with open(path, 'a') as f:
f.write(message.content + '\n')
response = random.choice(lines).replace(str(bot.user.id), str(message.author.id))
if response.startswith('!'):
print('test')
command_name = response.split()[0].replace('!', '')
cmd = bot.get_command(command_name)
await cmd.invoke(message)
else:
async with message.channel.typing():
if "http" in response:
await asyncio.sleep(len(response) / 8)
else:
await asyncio.sleep(len(response) / 6)
await message.channel.send(response)
await bot.process_commands(message)
@bot.command(help='demerde toi')
async def join(interaction: discord.Interaction):
channel = [x for x in bot.get_all_channels() if x.name == voice_channel][0]
await channel.connect()
await asyncio.sleep(2)
@bot.command()
async def welcome(interaction: discord.Interaction):
# if ctx.message.author.name == self.bot.user.name:
# return
user = interaction.message.author.name
print(user)
try:
audio_file = random.choice([f for f in os.listdir(audio_path + '/%s/' % user) if f.endswith('.mp3')])
audio_file = audio_path + '/%s/' % user + audio_file
except:
audio_file = random.choice([f"{f}" for f in Path(audio_path + '/').glob('**/*.mp3')])
bot.sounds.append((audio_file, interaction))
@bot.event
async def on_ready():
print('yeah baby!')
if test:
await bot.close()
for channel in bot.get_all_channels():
if channel.name == text_channel:
current_chan = channel
if nickname:
await bot.user.edit(nick=nickname)
await current_chan.send('Le troll est dans la place !')
ctx = await bot.get_context(current_chan.last_message)
#join = bot.get_command('join')
#await join.invoke(ctx)
for channel in bot.get_all_channels():
if channel.name == voice_channel:
for member in channel.members:
await member.edit(mute=False)
@bot.event
async def on_voice_state_update(member, before, after):
if before.channel is None and after.channel:
for channel in bot.get_all_channels():
if channel.name == text_channel:
current_chan = channel
ctx = await bot.get_context(current_chan.last_message)
ctx.message.author = member
#welcome = bot.get_command('welcome')
await asyncio.sleep(2)
#await welcome.invoke(ctx)
@aiocron.crontab('*/5 * * * *')
async def pipelette():
for channel in bot.get_all_channels():
if channel.name == voice_channel and\
len(channel.members) > 1 and\
random.choice([True,True,True]):
for channel in bot.get_all_channels():
if channel.name == text_channel:
current_chan = channel
await asyncio.sleep(random.choice(range(350)))
ctx = await bot.get_context(current_chan.last_message)
ctx.message.content = ""
joke = bot.get_command('joke')
await joke.invoke(ctx)
@bot.command(help='check if bot always online')
async def ping(ctx):
print("trying to repondre again")
await ctx.channel.send('pong')
bot.run(token)
await bot.add_cog(
Texte(
bot,
gif_token=gif_token,
gl_token=gl_token,
gl_url=gl_url,
text_chan=text_channel,
)
)
await bot.start(token)
if __name__ == "__main__":
main()
asyncio.run(main())

View file

@ -0,0 +1 @@
##

304
kabot/kabot/utils/audio.py Normal file
View file

@ -0,0 +1,304 @@
# This example requires the 'message_content' privileged intent to function.
import asyncio
import discord
import yt_dlp as youtube_dl
import random
import os
from discord.ext import commands, tasks
from pathlib import Path
# Suppress noise about console usage from errors
youtube_dl.utils.bug_reports_message = lambda: ''
ytdl_format_options = {
'format': 'bestaudio/mp3',
'outtmpl': '/tmp/discord_%(title)s-%(id)s.%(ext)s',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}]
}
ffmpeg_options = {
'options': '-vn',
}
ytdl = youtube_dl.YoutubeDL(ytdl_format_options)
class YTDLSource(discord.PCMVolumeTransformer):
def __init__(self, source, *, data, volume=0.5):
super().__init__(source, volume)
self.data = data
self.title = data.get('title')
self.url = data.get('url')
@classmethod
async def from_url(cls, url, *, loop=None, stream=False):
loop = loop or asyncio.get_event_loop()
data = await loop.run_in_executor(None, lambda: ytdl.extract_info(url, download=not stream))
if 'entries' in data:
# take first item from a playlist
data = data['entries'][0]
filename = data['url'] if stream else ytdl.prepare_filename(data)
extension = filename.split('.')[-1]
filename = filename.replace(extension, 'mp3')
return cls(discord.FFmpegPCMAudio(filename, **ffmpeg_options), data=data)
@classmethod
async def from_file(cls, audio_file, ctx, *, loop=None, stream=False):
source = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(audio_file,**ffmpeg_options))
ctx.voice_client.play(source, after=lambda e: print('Player error: %s' % e) if e else None)
class Audio(commands.Cog, name="Commandes Audio"):
def __init__(
self,
bot,
audio_path=None,
voice_chan=None,
text_chan=None):
self.bot = bot
self.sounds = []
self.voice_chan = voice_chan
self.text_chan = text_chan
self.audio_path = audio_path
self.sounds_history = []
@commands.Cog.listener()
async def on_voice_state_update(self, member, before, after):
if before.channel is None and after.channel:
for channel in self.bot.get_all_channels():
if channel.name == self.text_chan:
current_chan = channel
async for msg in current_chan.history(limit=1):
last_message = msg
ctx = await self.bot.get_context(last_message)
ctx.message.author = member
#welcome = bot.get_command('welcome')
await asyncio.sleep(2)
await self.welcome(ctx, member.display_name)
@commands.Cog.listener()
async def on_ready(self):
self.play_next.start()
self.pipelette.start()
if self.voice_chan:
await self.join(self.voice_chan)
#await self.welcome(None, 'kabot')
@commands.command(help="Toi tu fermes ta gueule! Tu la fermes définitivement")
async def mute(self, ctx, member: discord.Member=None, mute_time = 10):
if member.voice.mute:
async with ctx.channel.typing():
await asyncio.sleep(1)
await ctx.channel.send("Tu ne vas pas m'avoir si facilement")
return
if not member:
async with ctx.channel.typing():
await asyncio.sleep(1)
await ctx.channel.send("Qui veux-tu mute?")
return
if mute_time > 60:
async with ctx.channel.typing():
await asyncio.sleep(1)
await ctx.channel.send("Doucement sur le temps!")
return
member = ctx.message.mentions[0]
await member.edit(mute=True)
async with ctx.channel.typing():
await asyncio.sleep(1)
await ctx.send("Tu sors %s!" % member.mention)
await asyncio.sleep(mute_time)
await member.edit(mute=False)
async with ctx.channel.typing():
await asyncio.sleep(1)
await ctx.channel.send("Tu peux reparler %s!" % member.mention)
@commands.command(help="Appuie sur la detente PUSSY!")
async def roulette(self, ctx):
mute_time = 60
member = ctx.message.author
if not member.voice:
async with ctx.typing():
await ctx.channel.send("You're not in voice channel")
return
barillet = [False, False, True, False, False]
bullet = random.choice(barillet)
await self.joke(ctx, "spinning_and_closed.mp3")
if bullet == True:
await self.joke(ctx, "omawa_shindeiru.mp3")
await asyncio.sleep(2)
await member.edit(mute=True)
async with ctx.typing():
await asyncio.sleep(3)
await ctx.channel.send("Perdu, tu es mute pendant 60 secondes!")
else:
await self.joke(ctx, "hammer_and_dry.mp3")
async with ctx.typing():
await asyncio.sleep(3)
await ctx.channel.send("Gagné, tu ne seras pas mute!")
return
await asyncio.sleep(mute_time)
await member.edit(mute=False)
async with ctx.typing():
await asyncio.sleep(0.5)
await ctx.channel.send("Tu peux reparler %s!" % member.mention)
@tasks.loop(seconds=300)
async def pipelette(self):
for channel in self.bot.get_all_channels():
if channel.name == self.voice_chan and\
len(channel.members) > 1 and\
random.choice([True,True,True]):
for chan in self.bot.get_all_channels():
if chan.name == self.text_chan:
current_chan = chan
await asyncio.sleep(random.choice(range(305)))
async for msg in current_chan.history(limit=1):
last_message = msg
ctx = await self.bot.get_context(last_message)
ctx.message.content = ""
await self.joke(ctx)
@tasks.loop(seconds=1)
async def play_next(self):
connected_voice = [chan for chan in self.bot.voice_clients if chan.is_connected()]
if connected_voice:
if not [chan for chan in connected_voice if chan.is_playing()]:
if self.sounds:
audio_type, audio_file, ctx = self.sounds[0]
if 'url' in audio_type:
player = await YTDLSource.from_url(audio_file, loop=self.bot.loop)
ctx.voice_client.play(player, after=lambda e: print(f'Player error: {e}') if e else None)
elif 'file' in audio_type:
player = await YTDLSource.from_file(audio_file, ctx)
last_audio = self.sounds.pop(0)
self.sounds_history.reverse()
self.sounds_history.append(last_audio)
self.sounds_history.reverse()
if len(self.sounds_history) > 5:
self.sounds_history = self.sounds_history[:5]
@commands.command(help='Troll commands', hidden=True)
async def okre(self, ctx):
await self.joke(ctx, "tu_dois_tout_donner.mp3")
@commands.command(help='Clear sound queue')
async def clear(self, ctx):
self.sounds = []
@commands.command()
async def welcome(self, ctx, user=None):
if not user:
user = ctx.message.author.name
try:
audio_file = random.choice([f for f in os.listdir(self.audio_path + '%s/' % user) if f.endswith('.mp3')])
audio_file = self.audio_path + '/%s/' % user + audio_file
except:
audio_file = random.choice([f for f in Path(self.audio_path + '/').glob('**/*.mp3')])
self.sounds.append(('file', audio_file, ctx))
@commands.command()
async def repeat(self, ctx):
# if ctx.message.author.name == self.bot.user.name:
# return
user = ctx.message.author.name
source = self.sounds_history[0]
self.sounds.append(source)
@commands.command(help="detail du dernier son joué")
async def last(self, ctx, number = 1):
number = int(number)
for sound in self.sounds_history[0:number]:
await ctx.channel.send("```"+str(sound[1])+"```")
@commands.command()
async def join(self, chan_name: discord.VoiceChannel):
"""Joins a voice channel"""
#if ctx.voice_client is not None:
# return await ctx.voice_client.move_to(channel)
channel = [x for x in self.bot.get_all_channels() if x.name == chan_name][0]
return await channel.connect()
await channel.connect()
@commands.command()
async def play(self, ctx, *, url):
"""Plays from a url (almost anything youtube_dl supports)"""
self.sounds.append(('url',url,ctx))
@commands.command()
async def stream(self, ctx, *, url):
"""Streams from a url (same as yt, but doesn't predownload)"""
async with ctx.typing():
player = await YTDLSource.from_url(url, loop=self.bot.loop, stream=True)
ctx.voice_client.play(player, after=lambda e: print(f'Player error: {e}') if e else None)
await ctx.send(f'Now playing: {player.title}')
@commands.command()
async def stop(self, ctx):
await ctx.voice_client.stop()
@commands.command()
async def volume(self, ctx, volume: int):
"""Changes the player's volume"""
if ctx.voice_client is None:
return await ctx.send("Not connected to a voice channel.")
ctx.voice_client.source.volume = volume / 100
await ctx.send(f"Changed volume to {volume}%")
@commands.command()
async def leave(self, ctx):
"""Stops and disconnects the bot from voice"""
await ctx.voice_client.disconnect()
@commands.command()
async def joke(self, ctx, folder=None):
try:
user = ctx.message.author.name
if not folder or not ctx.message.content:
audio_file = random.choice([f"{f}" for f in Path(self.audio_path + '/').glob('**/*.mp3')])
elif folder:
folder = folder.lower()
audio_files = [f for f in Path(self.audio_path + '/').glob('**/*.mp3') if folder in str(f).lower()]
if not audio_files:
await ctx.channel.send("%s not found" % folder)
return
else:
audio_file = random.choice(audio_files)
self.sounds.append(('file', audio_file, ctx))
except:
e = exc_info()[0]
await ctx.channel.send("Exception when calling joke: %s\n" % e)
@play.before_invoke
@joke.before_invoke
@repeat.before_invoke
@stream.before_invoke
async def ensure_voice(self, ctx):
if ctx.voice_client is None:
if ctx.author.voice:
await ctx.author.voice.channel.connect()
else:
await ctx.send("You are not connected to a voice channel.")
raise commands.CommandError("Author not connected to a voice channel.")
elif ctx.voice_client.is_playing():
pass

View file

Before

Width:  |  Height:  |  Size: 7.3 KiB

After

Width:  |  Height:  |  Size: 7.3 KiB

View file

Before

Width:  |  Height:  |  Size: 7.7 KiB

After

Width:  |  Height:  |  Size: 7.7 KiB

View file

Before

Width:  |  Height:  |  Size: 10 KiB

After

Width:  |  Height:  |  Size: 10 KiB

View file

Before

Width:  |  Height:  |  Size: 6.1 KiB

After

Width:  |  Height:  |  Size: 6.1 KiB

View file

Before

Width:  |  Height:  |  Size: 5.4 KiB

After

Width:  |  Height:  |  Size: 5.4 KiB

View file

Before

Width:  |  Height:  |  Size: 8.4 KiB

After

Width:  |  Height:  |  Size: 8.4 KiB

View file

Before

Width:  |  Height:  |  Size: 9.3 KiB

After

Width:  |  Height:  |  Size: 9.3 KiB

453
kabot/kabot/utils/texte.py Normal file
View file

@ -0,0 +1,453 @@
# -*- coding: utf-8 -*-
"""Main module."""
from __future__ import unicode_literals
import aiocron
import asyncio
import discord
import giphy_client
import gitlab
import logging
import lxml
import os
import random
import requests
import configparser
import argparse
import typing
import functools
from bs4 import BeautifulSoup as bs
from discord.ext import tasks, commands
from giphy_client.rest import ApiException
from logging.handlers import RotatingFileHandler
from pathlib import Path
from subprocess import *
from sys import argv,exit, exc_info
here = os.path.dirname(os.path.abspath(__file__))
class Texte(commands.Cog):
#class Mybot(discord.Client):
#Fonctions necesaires pour Kabot.
def __init__(
self,
bot,
intents=None,
gl_url=None,
gl_token=None,
gif_token=None,
audio_path=None,
nickname=None,
voice_chan=None,
text_chan=None,
):
self.gl_url = gl_url
self.gl_token = gl_token
self.gif_token = gif_token
self.voice_chan = voice_chan
self.text_chan = text_chan
self.bot = bot
self.nickname = nickname
@commands.Cog.listener()
async def on_ready(self):
self.kron.start()
@commands.Cog.listener()
async def on_message(self, message):
if message.author == self.bot.user or message.content.startswith('!'):
return
else:
if self.bot.user in message.mentions \
and len(message.mentions) < 3 \
and len(message.content.splitlines()) == 1:
path = '/data/log/%s.log' % message.channel
with open(path, 'r') as f:
lines = f.read().splitlines()
if not message.content in lines:
with open(path, 'a') as f:
f.write(message.content + '\n')
response = random.choice(lines).replace(str(self.bot.user.id), str(message.author.id))
async with message.channel.typing():
if "http" in response:
await asyncio.sleep(len(response) / 8)
else:
await asyncio.sleep(len(response) / 6)
await message.channel.send(response)
await self.bot.process_commands(message)
@commands.command(help='list des commits')
async def commits(self, ctx, number = 5):
try:
if self.gl_url and self.gl_token:
number = int(number)
gl = gitlab.Gitlab(self.gl_url, self.gl_token)
gl.auth()
projects = gl.projects.list(search='Kabot')[0]
commits = projects.commits.list(all=True)[:number]
for commit in commits:
detail = commit.attributes
await ctx.channel.send("__" + detail['author_name'] + "__: " + detail['title'] + '\n' + detail['web_url'])
else:
await ctx.channel.send("-_-")
except:
await ctx.channel.send("-_-")
@commands.command(help="Interrogation issues \n Args: list, search[mot clé] et add[nom de l'issue]")
async def issue(self, ctx, *args):
if self.gl_url and self.gl_token:
if args:
args = list(args)
gl = gitlab.Gitlab(self.gl_url, self.gl_token)
gl.auth()
if args[0] == 'list':
projects = gl.projects.list(search='Kabot')[0]
issues = projects.issues.list()
for issue in issues:
if "closed" == issue.state:
pass
else:
await ctx.channel.send('#' + str(issue.id) + ": " + issue.title + '\n' + issue.web_url)
elif args[0] == 'search':
query = ''.join(args[1:])
project = gl.projects.list(search='Kabot')[0]
find_issues = project.search("issues", query)
for issue in find_issues:
await ctx.channel.send("#" + str(issue['id']) + ": " + issue['title'] + '\n' + issue['web_url'])
elif args[0] == 'add':
title = ' '.join(args[1:])
author = title + ' - By ' + ctx.message.author.name
projects = gl.projects.list()
for project in projects:
if "Kabot" == project.name:
issue = project.issues.create({'title': author})
logger.info("Issue created : %s" % issue.web_url)
else:
await ctx.channel.send('unknown command')
@commands.command(help='count lines numbers in quote file')
async def lines(self, ctx):
path = '/data/log/%s.log' % ctx.channel
with open(path, 'r') as f:
lines = f.read().splitlines()
nb_lines = len(lines)
async with ctx.channel.typing():
await asyncio.sleep(0.5)
await ctx.channel.send("j'ai %s lignes dans mon stock" % nb_lines)
@commands.command(help='check if bot always online')
async def ping(self, message):
await message.channel.send('pong')
@commands.command(help='Restart Bot')
async def restart(self, ctx):
cmd = self.bot.get_command('leave')
await cmd.invoke(ctx)
await self.bot.close()
exit()
@commands.command(help='Update local repo')
async def update(self, message):
output = Popen('git pull'.split(), stdout=PIPE).communicate()[0]
cmd_audio = "git -C %s pull" % self.audio_path
output += Popen(cmd_audio.split(), stdout=PIPE).communicate()[0]
await message.channel.send(output.decode('utf-8'))
@commands.command(help="randomsur l'avenir des gens.")
async def avenir(self, ctx):
list_mot = ("tu seras curé, tu t'occuperas plus spécialement du catéchisme. ",
"tu seras animateur de soirées pour les gays pride. ",
"tu seras gynecologue dans une maison de retraite.",
"tu iras vivre en thaïland à cause de ton job. car tu seras ladyboy dans un bar.",
"tu sera DSI chez jacky et Michel",
"tu seras arroseur de plante aquatique.")
choix = random.choice(list_mot)
async with ctx.channel.typing():
await asyncio.sleep(len(choix) / 4)
await ctx.channel.send(choix)
@commands.command(help="j'adore la cuisine")
async def ciboulette(self, ctx):
cmd = self.bot.get_command('gif')
ctx.message.content = "!gif ciboulette"
await cmd.invoke(ctx)
@commands.command(help='Faire des choix')
async def choice(self, ctx, *words):
choices = random.choice(words)
await ctx.channel.send(choices)
@commands.command(help="optionnal args : ")
async def contrepeterie(self, ctx, *args):
response = None
path = here + '/ressources/contrepeteries.txt'
with open(path) as file:
lines = file.read().splitlines()
myline = random.choice(lines)
question, reponse = myline.split(";")
try:
response = '''Question ! : %s Réponse : ||%s||''' % (question, reponse)
except:
response = "Unknow error, try: !contrepeterie [mot clef]"
await ctx.send(response)
@commands.command(help='Gif me')
async def gif(self, ctx):
query = ctx.message.content.replace('!gif ', '')
api_instance = giphy_client.DefaultApi()
api_key = self.gif_token
lang = 'fr'
if api_key:
try:
api_response = api_instance.gifs_search_get(api_key, query, lang=lang, limit=15)
api_response.to_dict()['data'][0]
get_url = random.choice(api_response.to_dict()['data'])
get_url['url']
await ctx.channel.send(get_url['url'])
except ApiException as e:
await ctx.channel.send("Exception when calling DefaultApi->gifs_search_get: %s\n" % e)
else:
await ctx.channel.send("Exception : No api key found")
@commands.command(help="optionnal args : [livre] [character]")
async def kaamelott(self, ctx, *args):
response = None
url = 'https://kaamelott.chaudie.re/api/random'
if args and ctx.message.content:
args = list(args)
if args[0].isdigit():
livre = int(args[0])
args.pop(0)
elif args[-1].isdigit():
livre = int(args[-1])
args.pop(-1)
else:
livre = None
if args:
perso = ' '.join(args)
if perso and livre:
url = 'https://kaamelott.chaudie.re/api/random/livre/%s/personnage/%s' % (livre, perso)
elif perso:
url = 'https://kaamelott.chaudie.re/api/random/personnage/%s' % perso
else:
url = 'https://kaamelott.chaudie.re/api/random/livre/%s' % livre
try:
citation = requests.get(url).json()['citation']
response = "%s :\n```\n%s\n```" % (citation['infos']['personnage'], citation['citation'])
except:
response = "Unknow error, try: !kaamelott [livre] [character]"
await ctx.send(response)
@commands.command(help="Je menotte une cornemuse et je fume Eddy Malou")
async def kamoulox(self, ctx):
sans_verbe = get_word('nom').text + " " + get_word('complement').get('m') + " et " + get_word('nom').text + " " + get_word('complement').get('m') + "."
nom1 = get_word('nom')
nom2 = get_word('nom')
un1 = "un"
un2 = "un"
if nom1.get('gender') == 'F':
un1 = "une"
if nom2.get('gender') == 'F':
un2 = "une"
phrase1 = get_word('verbe').text + " " + un1 + " " + nom1.text + " " + random.choice([get_word('complement').get('m'), ""])
phrase2 = get_word('verbe').text + " " + un2 + " " + nom2.text + " " + random.choice([get_word('complement').get('m'), ""])
avec_verbe = phrase1 + " et " + phrase2 + "."
piece = random.choice(['pile', 'face'])
if piece == "pile":
result = sans_verbe
elif piece == "face":
result = avec_verbe
async with ctx.channel.typing():
await asyncio.sleep(len(result)/6)
await ctx.send(result)
@commands.dm_only()
@commands.command(help="Faire dire des choses au bot")
async def say(self, ctx, *message):
sentence = ' '.join(message)
channel = [x for x in self.bot.get_all_channels() if x.name == self.text_chan][0]
guild = self.bot.guilds[0]
if sentence.startswith('!'):
command_name = sentence.split()[0].replace('!', '')
cmd = self.bot.get_command(command_name)
ctx.channel = channel
ctx.guild = guild
for word in sentence.split():
if word.startswith('@'):
members = guild.members
for member in members:
if member.nick.lower() == wordi.lower()[1:]:
sentence = sentence.replace(word, member.mention)
ctx.message = sentence
await cmd.invoke(ctx)
else:
for word in sentence.split():
if word.startswith('@'):
members = guild.members
for member in members:
if member.display_name.lower() in word.lower():
sentence = sentence.replace(word, member.mention)
await channel.send(sentence)
@commands.command(help='Who the fuck am i?')
async def schizo(self, ctx, *names):
name = ' '.join(names)
list_name = ["Kabot", "Gaspard et Balthazar", "Bender", "Zoidberg", "Gunther", "MissSaugnacEtCambran2022"]
try:
current_name = self.bot.user.name
list_name.remove(current_name)
except:
pass
if not name:
name = random.choice(list_name)
lower_names = [x.lower() for x in list_name]
if name.lower() in lower_names:
if name:
correct_name = [x for x in list_name if x.lower() in name.lower()][0]
name = correct_name
await self.bot.user.edit(username=name)
else:
name = "404"
await self.bot.user.edit(username="Error 404 name not found!")
await ctx.channel.send("Liste des noms = %s" % list_name)
img = open(here + "/ressources/avatar_bot/%s.jpg" % name, 'rb')
await self.bot.user.edit(avatar=img.read())
await self.joke(ctx, name)
@tasks.loop(seconds=3600)
async def kron(self):
kron_choice = random.choice([self.kaamelott, self.slap, self.kamoulox, self.contrepeterie, schizo, None, None, None, None, None, None])
if kron_choice != None:
await asyncio.sleep(random.choice(range(3550)))
for channel in self.bot.get_all_channels():
if channel.name == self.text_chan:
current_chan = channel
async for msg in current_chan.history(limit=1):
last_message = msg
ctx = await self.bot.get_context(last_message)
ctx.message.content = ""
await kron_choice(ctx)
@commands.command(help='slap this ass')
async def slap(self, ctx, user=None):
slap_multiple = [
"%s prend un coup de pied au cul",
"Descente du coude sur %s",
"%s est propulsé par dessus la TROISIEME CORDE!",
"Le mec en rose, c'est moi et le mec en jaune c'est %s! https://giphy.com/gifs/gSIz6gGLhguOY",
]
if not user or not ctx.message.mentions:
online_members = []
members = ctx.guild.members
for member in members:
if str(member.status) == "online":
online_members.append(member)
user = random.choice(online_members)
user = user.mention
elif ctx.message.mentions:
user = ctx.message.mentions[0]
user = user.mention
if user == self.bot.user.mention:
async with ctx.channel.typing():
await asyncio.sleep(0.5)
await ctx.channel.send("je tribuche par terre et je sais pas comment")
else:
async with ctx.channel.typing():
await asyncio.sleep(len(slap_multiple) / 4)
await ctx.channel.send(random.choice(slap_multiple) % user)
#Commandes pour troll.
@commands.command(help='Troll commands', hidden=True)
async def jke(self, ctx):
await ctx.channel.send(trollpower())
@commands.command(help='Troll commands', hidden=True)
async def joe(self, ctx):
await ctx.channel.send(trollpower())
@commands.command(help='Troll commands', hidden=True)
async def jok(self, ctx):
await ctx.channel.send(trollpower())
@commands.command(help='Troll commands', hidden=True)
async def joker(self, ctx):
await ctx.channel.send(trollpower(too_long=True))
@commands.command(help='Troll commands', hidden=True)
async def oke(self, ctx):
await ctx.channel.send(trollpower())
def trollpower(too_long=None):
if too_long:
return("Bah alors, on sait plus écrire, je te donne un indice: t'as une lettre en trop! :sweat_drops: :tongue:")
return('Bah alors, on sait plus écrire, je te donne un indice: il te manque une lettre! :sweat_drops: :tongue:')
def get_word(word_type):
"""Chercher les mots pour la fonction kamoulox dans le fichier xml"""
content = []
with open(here + "/ressources/base_kml.xml", "r", encoding="ISO-8859-1") as file:
content = file.readlines()
content = "".join(content)
bs_content = bs(content, 'lxml')
if word_type == 'nom':
nom = bs_content.resources.nom.find_all('word')
result = random.choice(nom)
return result
elif word_type == 'nom_propre':
nom_propre = bs_content.resources.nompropre.find_all('word')
result = random.choice(nom_propre)
return result
elif word_type == 'verbe':
verbe = bs_content.resources.verbe.find_all('word')
result = random.choice(verbe)
return result
elif word_type == 'complement':
complement = bs_content.resources.complement.find_all('word')
result = random.choice(complement)
return result
elif word_type == 'nom_special':
nom_special = bs_content.resources.complement.find_all('word')
result = random.choice(nom_special)
return result
elif word_type == 'prenom':
prenom = bs_content.resources.prenom.find_all('word')
result = random.choice(prenom)
return result
elif word_type == 'prescuse':
prescuse = bs_content.resources.prescuse.find_all('word')
result = random.choice(prescuse)
return result
elif word_type == 'scuse1':
scuse1 = bs_content.resources.scuse1.find_all('word')
result = random.choice(scuse1)
return result
elif word_type == 'scuse2':
scuse2 = bs_content.resources.scuse2.find_all('word')
result = random.choice(scuse2)
return result
elif word_type == 'presulte':
presulte = bs_content.resources.presulte.find_all('word')
result = random.choice(presulte)
return result
elif word_type == 'sulte':
sulte = bs_content.resources.sulte.find_all('word')
result = random.choice(sulte)
return result
elif word_type == 'postsulte':
postsulte = bs_content.resources.presulte.find_all('word')
result = random.choice(postsulte)
return result
elif word_type == 'mail':
mail = bs_content.resources.mail.find_all('word')
result = random.choice(mail)
return result
else:
result = 'Nique bien ta mère!'
return result

View file

@ -1,10 +0,0 @@
pip==19.2.3
bump2version==0.5.11
wheel==0.33.6
watchdog==0.9.0
flake8==3.7.8
tox==3.14.0
coverage==4.5.4
Sphinx==1.8.5
twine==1.14.0

View file

@ -18,7 +18,7 @@ requirements = [
'aiocron',
'python-gitlab',
'giphy_client',
'youtube_dl',
'yt-dl',
]
setup_requirements = [ ]
@ -28,7 +28,7 @@ test_requirements = [ ]
setup(
author="Michaël Ricart",
author_email='michael.ricart@0w.tf',
python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*',
python_requires='>=3.11',
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers',

View file

@ -1,20 +0,0 @@
[tox]
envlist = py27, py35, py36, py37 flake8
[travis]
python =
3.7: py37
3.6: py36
3.5: py35
2.7: py27
[testenv:flake8]
basepython = python
deps = flake8
commands = flake8 kabot
[testenv]
setenv =
PYTHONPATH = {toxinidir}
commands = python setup.py test

View file

@ -1,36 +0,0 @@
[versions]
Click = 7.0
Jinja2 = 2.10.1
MarkupSafe = 1.1.1
async-timeout = 3.0.1
attrs = 19.1.0
binaryornot = 0.4.4
certifi = 2019.9.11
chardet = 3.0.4
cookiecutter = 1.6.0
future = 0.17.1
idna = 2.8
jinja2-time = 0.2.0
mr.developer = 2.0.0
multidict = 4.5.2
poyo = 0.5.0
requests = 2.22.0
urllib3 = 1.25.6
websockets = 6.0
whichcraft = 0.6.1
yarl = 1.3.0
zc.buildout = 2.13.2
zc.recipe.egg = 2.0.7
# Required by:
# jinja2-time==0.2.0
arrow = 0.15.2
# Required by:
# arrow==0.15.2
python-dateutil = 2.8.0
# Required by:
# mr.developer==2.0.0
six = 1.12.0