"""
Send a message by email to one or more recipients (by SMTP or sendmail).
"""
# Copyright (c) 2009-2024, UChicago Argonne, LLC. See LICENSE file.
import os
import sys
SMTP_TIMEOUT = 10
[docs]
class MailerError(Exception):
pass
[docs]
def sendMail_sendmail(
subject, message, recipients, sendmail_cfg, sender=None, logger=None
):
"""
Send an email message using sendmail (linux only).
:param str subject: short text for email subject
:param str message: full text of email body
:param [str] recipients: list of email addresses to receive the message
:param dict sendmail_cfg: such as returned from :mod:`PvMail.ini_config.Config.get`
:user: required - (*str*) username to for sendmail (or similar) program
:param str sender: "From" address, if *None* use *smtp_cfg['user']* value
:param obj logger: optional message logging method
EXAMPLE::
>>> import PvMail.ini_config
>>> sendmail_cfg = PvMail.ini_config.Config().get()
>>> recipients = ['joe@gmail.com', 'sally@example.org']
>>> subject = 'sendmail test message'
>>> message = PvMail.ini_config.__doc__
>>> sendMail_sendmail(subject, message, recipients, sendmail_cfg)
"""
if sys.platform not in ("linux2"):
raise MailerError(f"Cannot use this method on {sys.platform=!r}")
sender = sender or sendmail_cfg["user"]
if isinstance(recipients, str):
recipients = [
recipients,
]
def _sendmail_handler(email_program, from_addr, recipients, subject, message):
to_addr = str(" ".join(recipients))
mail_command = "%s -F %s -t %s" % (email_program, from_addr, to_addr)
mail_message = [
mail_command,
]
for who in recipients:
mail_message.append("To: " + who)
mail_message.append("Subject: " + subject)
mail_message.append(message)
cmd = """cat << +++ | %s\n+++""" % "\n".join(mail_message)
return mail_command, cmd
def _mail_handler(email_program, from_addr, recipients, subject, message):
# to_addr = str(" ".join(recipients))
# mail_command = "%s %s" % (email_program, to_addr)
# mail_message = "%s\n%s" % (subject, message)
# # TODO: needs to do THIS
# """
# cat /tmp/message.txt | mail jemian@anl.gov
# """
# # cmd = '''echo %s | %s''' % (mail_message, mail_command)
# # return mail_command, cmd # lgtm [py/unreachable-statement]
raise MailerError("code needs improvement here")
cmd = None
for email_program, handler in (
("/usr/lib/sendmail", _sendmail_handler),
("/usr/bin/sendmail", _sendmail_handler),
("/usr/bin/mail", _mail_handler),
):
if os.path.exists(email_program):
results = handler(email_program, sender, recipients, subject, message)
mail_command, cmd = results
break
if cmd is None:
raise MailerError("Cannot find mail transport agent for sendmail.")
if logger is not None:
logger("sending email to: " + str(recipients))
logger("email program: " + email_program)
logger("mail command: " + mail_command)
logger("email From: " + sender)
try:
if logger is not None:
logger("email command:\n" + cmd)
if os.path.exists(email_program):
os.popen(cmd) # send the message
else:
if logger is not None:
logger("email program (%s) does not exist" % email_program)
except Exception as reason:
# err_msg = traceback.format_exc()
final_msg = f"{cmd=!r}\ntraceback: {reason}"
logger(final_msg)
raise MailerError(final_msg)
if logger is not None:
logger("sendmail sent")
[docs]
def sendMail_SMTP(subject, message, recipients, smtp_cfg, sender=None, logger=None):
"""
send email message through SMTP server
:param str subject: short text for email subject
:param str message: full text of email body
:param [str] recipients: list of email addresses to receive the message
:param dict smtp_cfg: such as returned from :mod:`PvMail.ini_config.Config.get`
:server: required - (*str*) SMTP server
:user: required - (*str*) username to login to SMTP server
:port: optional - (*str*) SMTP port
:password: optional - (*str*) password for username
:connection_security: optional - (*str*) **STARTTLS** (the only choice, if specified)
:param str sender: "From" address, if *None* use *smtp_cfg['user']* value
EXAMPLE::
>>> import PvMail.ini_config
>>> smtp_cfg = PvMail.ini_config.Config().get()
>>> recipients = ['joe@gmail.com', 'sally@example.org']
>>> subject = 'SMTP test message'
>>> message = PvMail.ini_config.__doc__
>>> sendMail_SMTP(subject, message, recipients, smtp_cfg)
"""
import email
import smtplib
host = smtp_cfg.get("server", None)
if host is None:
raise MailerError("must define an SMTP host to be used")
port = smtp_cfg.get("port", None)
username = smtp_cfg.get("user", None)
if username is None:
raise MailerError("must define a username for the SMTP server")
if sender is None:
sender = username
password = smtp_cfg.get("password", None)
connection_security = smtp_cfg.get("connection_security", None)
if connection_security not in (None, "STARTTLS"):
msg = "connection_security must be: STARTTLS or not defined, found: "
raise MailerError(msg + connection_security)
msg = email.message.Message()
if isinstance(recipients, str):
msg["To"] = recipients
else:
for who in recipients:
msg["To"] = who
msg["From"] = sender
msg["Subject"] = subject
msg.set_payload(message)
if logger is not None:
logger("sending email to: " + str(recipients))
logger("SMTP server: " + host)
logger("SMTP user: " + username)
logger("email From: " + sender)
smtpserver = smtplib.SMTP(timeout=SMTP_TIMEOUT)
# smtpserver.set_debuglevel(1)
if port is None:
smtpserver.connect(host)
else:
smtpserver.connect(host, int(port))
if logger is not None:
logger("SMTP connected")
smtpserver.ehlo()
if smtp_cfg.get("connection_security", None) == "STARTTLS":
smtpserver.starttls()
smtpserver.ehlo()
if logger is not None:
logger("SMTP STARTTLS")
if password is not None:
smtpserver.login(username, password)
if logger is not None:
logger("SMTP authenticated")
smtpserver.sendmail(username, recipients, str(msg))
smtpserver.close()
if logger is not None:
logger("SMTP complete")
[docs]
def send_message(subject, message, recipients, config):
"""
send an email message
:param str subject: short text for email subject
:param str message: full text of email body
:param [str] recipients: list of email addresses to receive the message
:param dict config: such as returned from :mod:`PvMail.ini_config.Config`
"""
email_agent_dict = dict(sendmail=sendMail_sendmail, SMTP=sendMail_SMTP)
agent = email_agent_dict[config.mail_transfer_agent]
agent(subject, message, recipients, config.get())
[docs]
def main():
"""
User on-demand test of the mailer module and configuration.
"""
import argparse
from . import DOCS_URL
from . import __version__
from . import ini_config
doc = f"Test the email sender from PvMail {__version__}"
parser = argparse.ArgumentParser(description=doc)
msg = "email address(es), whitespace-separated if more than one"
parser.add_argument("recipient", action="store", nargs="+", help=msg, default="")
results = parser.parse_args()
print(doc)
cfg = ini_config.Config()
print("Using configuration from: " + cfg.ini_file)
print("Using user name: " + cfg.get()["user"])
recipients = results.recipient
print("Sending email(s) to: " + str(" ".join(recipients)))
print("mail transfer agent: " + cfg.mail_transfer_agent)
subject = "PvMail mailer test message: " + cfg.mail_transfer_agent
message = f"{doc}\nFor more help, see: {DOCS_URL}"
send_message(subject, message, recipients, cfg)