在Python中使用Kafka检测流媒体应用程序中的欺诈交易

2021年11月11日02:31:26 发表评论 763 次浏览

Python如何检测欺诈交易?了解如何使用 Apache Kafka 和 Python API 检测和过滤流应用程序中的欺诈交易。

Python Kafka检测流媒体欺诈交易:随着系统的扩展和迁移,打击欺诈者已成为最重要的挑战,最好使用实时流处理技术来解决。

Python Kafka检测欺诈交易教程:出于本教程的目的,我们将从头开始构建一个实时欺诈检测系统,在该系统中我们将生成一个虚构的交易流并同步分析它们以检测哪些是欺诈的。

先决条件

根据我们的要求,我们需要一个可靠、可扩展和容错的事件流平台,用于存储我们的输入事件或交易和处理结果。Apache Kafka是一个开源分布式流媒体平台,正好满足了这一需求。

Apache Kafka 可以从其官方站点下载,在你的操作系统上安装和运行 Apache Kafka 不在本教程的范围内。但是,你可以查看本指南,了解如何在 Ubuntu(或任何基于 Debian 的发行版)上安装它。

完成所有设置后,你可以检查以下内容:

  • Zookeeper 实例正在TCP 端口 2181上运行。
  • 一个 Kafka 实例正在运行并绑定到TCP 端口 9092

如果上述控制都得到满足,那么现在你有一个单节点的 Kafka 集群启动并运行。

创建应用程序骨架

我们现在可以开始使用 Kafka 的消费者和生产者 API 构建我们的实时欺诈检测应用程序。

我们的应用程序将包括:

  • 一个交易生成器,一方面,它产生虚构的交易来模拟事件流。
  • 另一方面,欺诈检测器可过滤掉看起来可疑的交易。

以下流程图展示了我们的设计:

在Python中使用Kafka检测流媒体应用程序中的欺诈交易

Python如何检测欺诈交易?让我们直接进入设置。当然,你需要在系统上安装 Python 3。我将使用一个虚拟环境来安装所需的库,这无疑是选择的最佳方法:

  • 创建一个虚拟环境并激活它:$ python -m venv fraud-detector $ source fraud-detector/bin/activate复制
  • 创建文件requirements.txt并向其添加以下行:Kafka-python==2.0.2 Flask==1.1.2复制
  • 安装库:$ pip install -r requirements.txt复制

在本教程结束时,文件夹结构将如下所示:

在Python中使用Kafka检测流媒体应用程序中的欺诈交易清楚了这一点,让我们现在开始编写实际代码。

首先,让我们在我们的settings.py文件中初始化我们的参数:

# URL for our broker used for connecting to the Kafka cluster
KAFKA_BROKER_URL   = "localhost:9092"
# name of the topic hosting the transactions to be processed and requiring processing
TRANSACTIONS_TOPIC = "queuing.transactions"
# these 2 variables will control the amount of transactions automatically generated
TRANSACTIONS_PER_SECOND = float("2.0")
SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND
# name of the topic hosting the legitimate transactions
LEGIT_TOPIC = "queuing.legit"
# name of the topic hosting the suspicious transactions
FRAUD_TOPIC = "queuing.fraud"

注意:为简洁起见,我将配置参数硬编码在 中settings.py,但建议将这些参数存储在单独的文件中(例如 .env)

其次,让我们创建一个 Python 文件,transactions.py用于动态创建随机交易:

from random import choices, randint
from string import ascii_letters, digits

account_chars: str = digits + ascii_letters

def _random_account_id() -> str:
    """Return a random account number made of 12 characters"""
    return "".join(choices(account_chars,k=12))

def _random_amount() -> float:
    """Return a random amount between 1.00 and 1000.00"""
    return randint(100,1000000)/100

def create_random_transaction() -> dict:
    """Create a fake randomised transaction."""
    return {
        "source":_random_account_id()
       ,"target":_random_account_id()
       ,"amount":_random_amount()
       ,"currency":"EUR"
    }

Python Kafka检测欺诈交易教程:第三,让我们构建我们的交易生成器,它将用于创建交易流。Python 文件producer.py将扮演交易生成器的角色,并将发布的交易存储在名为 的主题中queuing.transactions,以下是 的代码producer.py

import os
import json
from time import sleep
from kafka import KafkaProducer
# import initialization parameters
from settings import *
from transactions import create_random_transaction


if __name__ == "__main__":
   producer = KafkaProducer(bootstrap_servers = KAFKA_BROKER_URL
                            #Encode all values as JSON
                           ,value_serializer = lambda value: json.dumps(value).encode()
                           ,)
   while True:
       transaction: dict = create_random_transaction()
       producer.send(TRANSACTIONS_TOPIC, value= transaction)
       print(transaction) #DEBUG
       sleep(SLEEP_TIME)

为了确保你走在正确的轨道上,让我们测试一下producer.py程序。为此,请打开终端窗口并键入以下内容:

$ python producer.py

注意:在执行测试之前,请确保 Kafka 服务器正在运行。

你应该会看到类似于以下内容的输出:

在Python中使用Kafka检测流媒体应用程序中的欺诈交易第四,在我们保证我们的生产者程序启动并运行之后,让我们现在开始构建欺诈检测机制来处理交易流并查明欺诈交易。

Python Kafka检测流媒体欺诈交易:我们将开发这个程序的两个版本:

  • 版本 1detector.py:该程序将根据特定标准或一组标准过滤掉排队的交易,并将结果输出到两个单独的主题中:一个用于合法交易LEGIT_TOPIC,另一个FRAUD_TOPIC用于满足我们选择的标准的欺诈交易。

该程序基于Kafka Python 消费者 API,该 API 允许消费者订阅特定的 Kafka 主题,只要这些消息正在发布,Kafka 就会自动向他们广播消息。

下面是代码detector.py

import os
import json
from kafka import KafkaConsumer, KafkaProducer
from settings import *

def is_suspicious(transaction: dict) -> bool:
    """Simple condition to determine whether a transaction is suspicious."""
    return transaction["amount"] >= 900

if __name__ == "__main__":
   consumer = KafkaConsumer(
       TRANSACTIONS_TOPIC
      ,bootstrap_servers=KAFKA_BROKER_URL
      ,value_deserializer = lambda value: json.loads(value)
      ,
   )

   for message in consumer:
       transaction: dict = message.value
       topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
       print(topic,transaction) #DEBUG

为简单起见,我选择了一个is_suspicious()基于简单谓词的基本条件函数(即如果交易金额大于等于900,则为可疑)。但是,在现实生活场景中,可能会涉及到很多参数,其中:

  • 无论是活动的、非活动的还是休眠的,发起方都负责交易。
  • 交易的位置,如果是从一个在锁定期间应该关闭的实体发起的。

这些场景将构成欺诈管理解决方案的核心,应仔细设计以确保该解决方案的灵活性和响应能力。

现在让我们测试producer.pydetector.py,打开一个新的终端窗口并输入以下内容:

$ python producer.py

同时,打开另一个窗口并输入:

$ python detector.py

注意:在执行测试之前,请确保 Kafka 服务器正在运行。

你将在 上看到与此类似的输出detector.py

在Python中使用Kafka检测流媒体应用程序中的欺诈交易

Python如何检测欺诈交易?程序中包含的调试打印会将交易输出到控制台,并根据我们指定的条件和相关的交易金额指示目标队列。

  • 版本 2appdetector.py:这是检测器.py 脚本的高级版本,它将使用 Flask 微框架将交易流式传输到网络。

此代码中使用的技术如下:

  • Flask:Python 中的微型 Web 框架。
  • 服务器发送事件 (SSE):一种服务器推送机制,其中客户端订阅服务器生成的更新流,并且每当发生新事件时,都会向客户端发送通知。
  • Jinja2:在 Python 生态系统中广泛使用的现代模板引擎。Flask 默认支持 Jinja2。

以下是appdetector.py

from flask import Flask, Response, stream_with_context, render_template, json, url_for

from kafka import KafkaConsumer
from settings import *

# create the flask object app
app = Flask(__name__)

def stream_template(template_name, **context):
    print('template name =',template_name)
    app.update_template_context(context)
    t = app.jinja_env.get_template(template_name)
    rv = t.stream(context)
    rv.enable_buffering(5)
    return rv

def is_suspicious(transaction: dict) -> bool:
    """Determine whether a transaction is suspicious."""
    return transaction["amount"] >= 900

# this router will render the template named index.html and will pass the following parameters to it:
# title and Kafka stream
@app.route('/')
def index():
    def g():
        consumer = KafkaConsumer(
            TRANSACTIONS_TOPIC
            , bootstrap_servers=KAFKA_BROKER_URL
            , value_deserializer=lambda value: json.loads(value)
            ,
        )
        for message in consumer:
            transaction: dict = message.value
            topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
            print(topic, transaction)  # DEBUG
            yield topic, transaction

    return Response(stream_template('index.html', title='Fraud Detector / Kafka',data=g()))

if __name__ == "__main__":
   app.run(host="localhost" , debug=True)

接下来,我们将定义路由函数index.html使用的模板HTML 文件,该文件index()位于templates文件夹下:

<!doctype html>
<title> Send Javascript with template demo </title>
<html>
<head>
</head>
<body>
    <div class="container">
        <h1>{{title}}</h1>
    </div>
    <div id="data"></div>
    {% for topic, transaction in data: %}
    <script>
        var topic = "{{ topic }}";
        var transaction = "{{ transaction }}";
        if (topic.search("fraud") > 0) {
            topic = topic.fontcolor("red")
        } else {
            topic = topic.fontcolor("green")
        }
        document.getElementById('data').innerHTML += "<br>" + topic + " " + transaction;
    </script>
    {% endfor %}
</body>
</html>

index.html包含一个Javascript实现遍历整个接收到的数据流和他们收到以显示交易。

现在运行它,确保producer.py正在运行,然后:

$ python appdetector.py

Python Kafka检测流媒体欺诈交易:它应该在端口 5000 上启动一个本地服务器,转到你的浏览器并访问http://localhost:5000Flask 实例运行的位置,你将看到合法和欺诈交易的连续流,如下面的屏幕所示:

在Python中使用Kafka检测流媒体应用程序中的欺诈交易你可以在此处查看完整代码。

本Python Kafka检测欺诈交易教程说明了如何在欺诈检测应用程序中应用流处理范例。

木子山

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: