首页 > 编程知识 正文

pyspark获取当前时间,pysparksql计算百分比

时间:2023-05-06 16:21:31 阅读:257817 作者:2362

在实际业务中,在某一行的计算需要利用到改行前后的一些信息,例如,当前时间前1天内的汇总,或当前时间前1h的最大值和当前值的差值等等

在spark 1.4之后,提供了sql.windows函数,其形如:

from pyspark.sql import Window>>> window = Window..partitionBy("country").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

在这里需要明确几个窗口计算的概念:

partitionBy:分组,所有的通过rowsBetween和rangeBetween切割出来的帧都是在分组的基础上的;orderBy:排序,这个比较好理解,就是按照那个字段排序rowsBetween/rangeBetween :这个需要明确,rowBetween是当前行的前或者后几行,rangeBetween是针对orderby的值计算出来的范围再和orderby比较来得到时间帧,下面这个图来辅助理解:
这是rowBetween
这是rangeBetween:



这时有个问题,如果我们需要排序的字段是timestamp类型,rangeBetween中是不能写datetime.timedleta的,所有需要将其转换一下,方式如下: from pyspark import HiveContextfrom pyspark.sql.types import TimestampTypefrom pyspark.sql import functions as Ffrom pyspark.sql.functions import udffrom pyspark.sql.window import Windowfrom pyspark.sql.functions import colimport datetimesql_context = HiveContext(sc)# 创建测试数据集test_df = sql_context.createDataFrame([ ('2019-05-26 01:05:00.600', "a", "c1"), ('2019-05-26 01:05:00.900', "a", "c1"), ('2019-05-26 01:05:01.900', "a", "c1"), ('2019-05-26 01:06:01.900', "a", "c2"), ('2019-05-26 02:06:01.900', "a", "c2"), ('2019-05-26 01:05:00.000', "b", "c2"), ('2019-05-26 01:05:01.000', "b", "c2"), ('2019-05-26 01:05:02.000', "b", "c3"), ('2019-05-26 01:06:02.000', "b", "c3"), ('2019-05-26 01:08:02.000', "b", "c3"), ('2019-05-26 02:08:02.000', "b", "c4"), ('2019-05-27 02:08:02.000', "b", "c4"),], ["id", "category", "C_type"])def str2time(date_str): try: cur_date = datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%f") except ValueError: cur_date = datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f") except Exception: cur_date = datetime.datetime.utcfromtimestamp(0) return cur_datestr2time_udf = udf(str2time,TimestampType())# 增加一列timestamp变量test_df1 = test_df.withColumn("datetime",str2time_udf("id"))# 准备将其转换为对应的秒数days = lambda i: i*86400hours = lambda i: i*3600mins = lambda i: i*60# 创建一个时间窗口,取当前行的前1分钟作为时间帧window_spec = Window.partitionBy("category").orderBy(col("datetime").cast("long")).rangeBetween(-mins(1), 0)# 在该时间帧上应用计数函数test_df1.withColumn("c_count", F.count("C_type").over(window_spec)).show(truncate=False)

最终得到的结果如下:

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。