python获取微信企业号打卡数据并生成windows计划任务

由于公司的系统用的是Java版本,开通了企业号打卡之后又没有预算让供应商做数据对接,所以只能自己捣鼓这个,以下是个人设置的一些内容,仅供大家参考

安装python

python的安装,这里就不详细写了,大家可自行度娘或google。

安装第三方库

python安装好之后别忘记配置环境变量!另外,所以的内容都是安装在服务器上的,且服务器需要能够上外网,否则,只能配置在本地,因为需要外网连接微信企业号的接口。这里需要用到几个第三方库:

python的pip命令,一般python安装好之后都会默认有,如果不确定,可输入命令查询,通过cmd进入命令提示符,输入

pip list

如果提示你需要更新,你可以更新,也可以不更新,更新命令其实给到你了python -m pip install --upgrade pip

安装所需要的库

Step.1

pip install pymssql

如果安装pymssql出错,提示什么visual C++ 14,则先安装wheel,如不报错则忽略step2、step3

Step.2

pip install wheel

Step.3

下载pymssql-2.1.4.dev5-cp37-cp37m-win_amd64.whl

可去这里下载最新版本的。pymssql下载

下载好之后,进入该文件所在的目录,通过pip install安装即可cd D:

pip install pymssql-2.1.4.dev5-cp37-cp37m-win_amd64.whl

step.4

pip install requests

至此,所有第三方库都配置好了。

写主程序

# !/usr/bin/python
# -*- coding:utf-8 -*-
# @Time: 2018/7/26 16:05
# @Author: hychen.cc
import json # 因微信企业号返回的格式为json,所以引入json
import requests
import pymssql
import math # 引入数学方法
import time
import datetime
server = 'XX.XX.XX.XX' # 数据库服务器地址
user = 'sa' # 数据库登录名,可以用sa
password = '******' # 数据库用户对应的密码
dbName = 'DBNAME' # 数据库名称
CORP_ID = 'XXXXXX' # 微信企业号提供的CORP_ID
CORP_SECRET = 'XXXXX' # 微信企业号提供的CORP_SECRET
"""

因微信接口所需要unix时间戳,所以需要把时间转为为Unix时间戳格式

定义时间转换为Unix时间方法

"""def datetime_timestamp(dt):
 # dt为字符串
 # 中间过程,一般都需要将字符串转化为时间数组
 time.strptime(dt, '%Y-%m-%d %H:%M:%S')
 ## time.struct_time(tm_year=2018, tm_mon=10, tm_mday=25, tm_hour=10, tm_min=0, tm_sec=0, tm_wday=0, tm_yday=88, tm_isdst=-1)
 # 将"2018-10-25 10:00:00"转化为时间戳
 s = time.mktime(time.strptime(dt, '%Y-%m-%d %H:%M:%S'))
 return int(s)
# 定义连接数据库方法
def get_link_server():
 connection = pymssql.connect(server, user, password, database=dbName)
 if connection:
  return connection
 else:
  raise ValueError('Connect DBServer failed.')
"""

定义获取用户列表,因为微信企业号一次最大只能获取100个,所以需要转换为列表格式,分批次获取

我这里设置是从DB中获取有权限微信打卡的人员(Select * From Table),换成自己的方式即可

"""
def get_userid_list():
 """
 获取用户列表
 :return:
 """
 conn = get_link_server()
 cursor = conn.cursor()
 sql = "Select * From Table"
 cursor.execute(sql)
 row = cursor.fetchone()
 userlist = []
 while row:
  userlist.append(row[0])
  row = cursor.fetchone()
 if userlist:
  return userlist
 else:
  raise ValueError('Get Userlist failed.')
 conn.close()
"""

获取Access_Token,因为Token有时效(2小时),所以需要存在本地,这样不需要频繁调用,所以我定义了存储过程(sP_GetWX_access_token)来判断之前存储的token是否有效,有效的话就不需要重复获取了

"""
def get_access_token(refresh=False):
 """
 获取Access Token
 :return:
 """
 if not refresh:
  API_ACCESS_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % (
   CORP_ID, CORP_SECRET)
  response = requests.get(API_ACCESS_TOKEN_URL, verify=False)
  if response.status_code == 200:
   rep_dict = json.loads(response.text)
   errcode = rep_dict.get('errcode')
   if errcode:
    raise ValueError('Get wechat Access Token failed, errcode=%s.' % errcode)
   else:
    access_token = rep_dict.get('access_token')
    if access_token:
     conn = get_link_server()
     cursor = conn.cursor()
     cursor.execute('exec sP_GetWX_access_token @Access_Token=%s', access_token)
     conn.commit()
     conn.close()
     return access_token
    else:
     raise ValueError('Get wechat Access Token failed.')
  else:
   raise ValueError('Get wechat Access Token failed.')
 else:
  conn = get_link_server()
  cursor = conn.cursor()
  cursor.execute("Select Access_Token From wx_AccessToken Where ID=1")
  access_token = cursor.fetchone()
  if access_token:
   return access_token[0]
   conn.close()
  else:
   API_ACCESS_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % (
    CORP_ID, CORP_SECRET)
   response = requests.get(API_ACCESS_TOKEN_URL, verify=False)
   if response.status_code == 200:
    rep_dict = json.loads(response.text)
    errcode = rep_dict.get('errcode')
    if errcode:
     raise ValueError('Get wechat Access Token failed, errcode=%s.' % errcode)
    else:
     access_token = rep_dict.get('access_token')
     if access_token:
      conn = get_link_server()
      cursor = conn.cursor()
      cursor.execute('exec sP_GetWX_access_token @Access_Token=%s', access_token)
      conn.commit()
      conn.close()
      return access_token
     else:
      raise ValueError('Get wechat Access Token failed.')
   else:
    raise ValueError('Get wechat Access Token failed.')
# 获取微信打卡的json格式
def get_punchcard_info(access_token, opencheckindatatype, starttime, endtime, useridlist):
 API_PUNCH_CARD_URL = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token
 json_str = json.dumps(
  {'opencheckindatatype': opencheckindatatype, 'starttime': starttime, 'endtime': endtime, 'useridlist': useridlist})
 response = requests.post(API_PUNCH_CARD_URL, data=json_str, verify=False)
 if response.status_code == 200:
  rep_dic = json.loads(response.text)
  errcode = rep_dic.get('errcode')
  if errcode == 42001:
   access_token = get_access_token(True)
   API_PUNCH_CARD_URL = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token
   json_str = json.dumps(
    {'opencheckindatatype': opencheckindatatype, 'starttime': starttime, 'endtime': endtime,
    'useridlist': useridlist})
   response = requests.post(API_PUNCH_CARD_URL, data=json_str, verify=False)
   rep_dic = json.loads(response.text)
   errcode = rep_dic.get('errcode')
   if errcode:
    raise ValueError('Get punch data failed1, errcode=%s' % errcode)
   else:
    value_str = rep_dic.get('checkindata')
    if value_str:
     return value_str
    else:
     raise ValueError('Get punch data failed2.')
  elif errcode:
   raise ValueError ('Get punch data failed3, errcode=%s' % errcode)
  else:
   value_str = rep_dic.get('checkindata')
   if value_str:
    return value_str
   else:
    raise ValueError('I do not find employee punch data.')
 else:
  raise ValueError ('Get punch data failed5.')
# 调用接口,获得数据
if __name__ == '__main__':
 today = datetime.date.today()
 oneday = datetime.timedelta(days=3) # days,即获取几天内的
 yesterday = today - oneday
 starttime = datetime_timestamp(yesterday.strftime('%Y-%m-%d') + ' 00:00:00')
 endtime = datetime_timestamp(today.strftime('%Y-%m-%d') + ' 23:59:59')
 opencheckindatatype = 3
 access_token = get_access_token()
 if access_token:
  useridlist = get_userid_list()
  if useridlist:
   step = 100
   total = len(useridlist)
   n = math.ceil(total/step)
   for i in range(n):
    # print (useridlist[i*step:(i+1)*step])
    punch_card = get_punchcard_info(access_token, opencheckindatatype, starttime, endtime,useridlist[i*step:(i+1)*step])
    # print (punch_card)
    if punch_card:
     conn = get_link_server()
     cursor = conn.cursor()
     for dic_obj in punch_card:
      cursor.execute('exec sp_AnalysisPunchCard @Json=%s',
          (json.dumps(dic_obj, ensure_ascii=False)))
      # print((json.dumps(dic_obj, ensure_ascii=False))),sp_AnalysisPunchCard把获取到的数据解析后存入数据库中
      conn.commit()
     conn.close()
     print ('Get punch card successed.')
    else:
     raise ValueError('No userlist exists')

设置Windows计划任务

通过控制面板-管理工具-任务计划程序,右击选择创建基本任务,这里注意的是路径和程序。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

程序或脚本:python.exe

添加参数(可选)(A):你的py文件目录

起始于:python目录,如果不知道python安装到哪去了,按照下列cmd命令,输入python后进入python命令查询

import sys
sys.prefix,回车

到此,配置完成,可自行右击任务-执行查询效果,或者通过python命令执行py文件

进入到py文件目录

python xxx.py

总结

以上所述是小编给大家介绍的python获取微信企业号打卡数据并生成windows计划任务,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对来客网网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!