一、为什么需要 AirflowOpenMetadata 使用 Airflow 作为其默认的工作流编排引擎Pipeline Service Client主要用于自动化和管理元数据摄取任务。核心作用1. 编排摄取工作流 (Orchestration)OpenMetadata 的摄取框架Ingestion Framework是用 Python 编写的。为了实现定时自动采集、数据质量分析Profiler和数据血缘追踪OpenMetadata 需要一个成熟的调度器来运行这些 Python 脚本。Airflow 提供了任务调度、重试机制和日志监控能力。File:ingestion/src/airflow_provider_openmetadata/lineage/runner.py (L85-100)class AirflowLineageRunner: Given the OpenMetadata connection, aservicename and a DAG:1. Create the Pipeline Service(if not exists)2. Create or update the Pipeline(DAG tasks)3. Add the task status(Task Instances). Well pick this up from the available information. This operator should run the last to have the complete view. 4. Add Pipeline Lineage from xlets This Runner will be called either from: 1. Lineage Backend 2. Lineage Operator In both cases, this will run directly on an Airflow instance. Therefore, well use the airflow config data to populate entities details.2. 动态 DAG 管理 (Managed APIs)通过openmetadata-managed-apis插件OpenMetadata Server 可以直接通过 REST API 在 Airflow 中动态地部署 (Deploy)、触发 (Trigger)和停止摄取任务DAGs 。这意味着你不需要手动编写 Airflow DAG 代码一切都可以通过 OpenMetadata UI 完成。File:** openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java (L49-72)public class AirflowRESTClient extends PipelineServiceClient{private static final String PLATFORMAirflow;private static final String USERNAME_KEYusername;private static final String PASSWORD_KEYpassword;private static final String TIMEOUT_KEYtimeout;private static final String TRUSTSTORE_PATH_KEYtruststorePath;private static final String TRUSTSTORE_PASSWORD_KEYtruststorePassword;private static final String DOCS_LINKFollow [this guide](https://docs.open-metadata.org/deployment/ingestion/openmetadata) for further details.;protected final String username;protected final String password;protected final HttpClient client;protected final URL serviceURL;private volatile ListStringapiEndpointSegments;private static final String DAG_IDdag_id;private static final String CONFconf;private static final String APP_CONFIG_OVERRIDEappConfigOverride;private String detectedAirflowVersionnull;private final Object detectionLocknew Object();private volatile String csrfTokennull;private volatile ListStringsessionCookiesnull;3. 作为元数据源 (Metadata Source)Airflow 本身也是一个重要的数据资产。OpenMetadata 通过AirflowSource提取 Airflow 中的流水线Pipelines、任务Tasks及其运行状态并自动解析任务间的数据血缘 (Lineage)。File:** ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py (L11-13) Airflowsourceto extract metadata from OM UIFile:ingestion/src/airflow_provider_openmetadata/lineage/backend.py (L34-45)class OpenMetadataLineageBackend(LineageBackend): Sends lineage data from tasks to OpenMetadata. Configurable viaairflow.cfgas follows:[lineage]backendairflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend airflow_service_nameairflow openmetadata_api_endpointhttp://localhost:8585/api jwt_tokentoken# To auth to the OpenMetadata API技术架构实现Java 后端与 Airflow 的交互OpenMetadata Server 通过AirflowRESTClient调用 Airflow 暴露的插件接口。版本检测它能自动识别 Airflow 2.x 或 3.x 环境 。File:openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java (L113-146)private ListStringdetectAirflowApiVersion(){// Try Airflow3.x with /pluginsv2 prefix first try{ListStringv3SegmentsList.of(pluginsv2,api,v2,openmetadata);URIBuilder v3Buildernew URIBuilder(String.valueOf(serviceURL));ListStringsegmentsnew ArrayList(v3Builder.getPathSegments());segments.addAll(v3Segments);segments.add(health-auth);v3Builder.setPathSegments(segments);HttpRequest requestHttpRequest.newBuilder(v3Builder.build()).header(CONTENT_HEADER, CONTENT_TYPE).header(AUTH_HEADER, getBasicAuthenticationHeader(username, password)).GET().timeout(Duration.ofSeconds(5)).build();HttpResponseStringresponseclient.send(request, HttpResponse.BodyHandlers.ofString());if(response.statusCode()200){try{JSONObject responseJSONnew JSONObject(response.body());detectedAirflowVersionresponseJSON.getString(version);LOG.info(Detected Airflow version {} - using /pluginsv2/api/v2 endpoints (Airflow 3.x), detectedAirflowVersion);returnv3Segments;}catch(Exception parseError){LOG.debug(Received 200 response from /pluginsv2 health-auth but failed to parse version: {}, parseError.getMessage());}}配置集成在openmetadata.yaml中你可以看到pipelineServiceClientConfiguration默认指向了 Airflow 的 REST 客户端 。File:conf/openmetadata.yaml (L544-549)pipelineServiceClientConfiguration: enabled:${PIPELINE_SERVICE_CLIENT_ENABLED:-true}# If we dont need this, set org.openmetadata.service.clients.pipeline.noop.NoopClientclassName:${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient}apiEndpoint:${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://localhost:8080}metadataApiEndpoint:${SERVER_HOST_API_URL:-http://localhost:8585/api}摄取任务的生命周期阶段执行动作涉及组件部署UI 点击“Schedule”Server 调用 API 创建 DAGAirflowRESTClient-managed-apis执行Airflow 调度器启动 Worker 运行 Python 摄取代码airflow-env(Python Ingestion)监控UI 展示任务成功/失败状态和实时日志AirflowRESTClient-managed-apis扩展支持虽然 Airflow 是官方推荐和默认集成的引擎但 OpenMetadata 的设计是插件化的。在openmetadata.yaml中className可以配置为其他实现如NoopClient或自定义 K8s 客户端但目前 Airflow 提供了最完整的功能支持 。Notes解耦运行虽然 OpenMetadata 依赖 Airflow 进行调度但你可以使用外部已有的 Airflow 集群如 AWS MWAA 或 GCP Composer只需配置好连接信息即可 。File:openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/Airflow.md (L40-46)## Airflow REST API ConnectionThe REST API connection calls the Airflow web server over HTTP/HTTPS and does not require direct access to Airflows metadata database. This makes it the right choiceformanaged deployments(Astronomer, GCP Cloud Composer, MWAA)andforany self-hosted Airflow where direct DB access is not available or desired.$$note The REST API connection fetches DAG topology, task structure, schedules, and run statuses. **Lineage is not captured through this connection.** To get table-level and column-level lineageinOpenMetadata, you must separatelyinstalltheahrefhttps://docs.open-metadata.org/connectors/pipeline/airflow/lineage-backendtarget_blankOpenMetadata Lineage Backend/ainAirflow(strategy2)or use theahrefhttps://docs.open-metadata.org/connectors/pipeline/airflow/lineage-operatortarget_blankLineage Operator/ainyour DAGs(strategy3). Once those emit OpenLineage events, lineage edges will appear automaticallyinOpenMetadata.$$Airflow 3.x最新版本的 OpenMetadata 已全面支持 Airflow 3.x包括其新的 API 路径和环境要求。二、以数据源连接示例UI 上的数据源测试连接【Test Connection】 按钮强依赖 Airflow 可用性。TestConnection.tsx中明确检查了isAirflowAvailable不可用时按钮禁用。File:openmetadata-ui/src/main/resources/ui/src/components/common/TestConnection/TestConnection.tsx (L128-132)const isTestConnectionDisabledisTestingConnection||isTestingDisabled||!allowTestConn||!isAirflowAvailable;整体交互链路UI(浏览器)└─ POST /api/v1/automations/workflows/{id}/trigger └─ Java 后端(Windows:8585)└─ AirflowRESTClient.runAutomationsWorkflow()└─ POST http://localhost:8080/run_automation └─ Airflow(WSL:8080)└─ execute(automation_workflow)└─ ingestion 直连目标数据库 └─ PATCH 结果回写 Java 后端Java 后端通过AirflowRESTClient调用 Airflow 的/run_automation端点File:openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java (L511-540)Override public PipelineServiceClientResponse runAutomationsWorkflow(Workflow workflow){HttpResponseStringresponse;try{String automationsUrlbuildURI(run_automation).build().toString();String workflowPayloadJsonUtils.pojoToJson(workflow);responsepost(automationsUrl, workflowPayload);if(response.statusCode()200){returngetResponse(200, response.body());}}catch(IOException|URISyntaxException e){// We can end up hereifthetestconnection is not sending back anything after the POST // request // due to the connection to thesourceservicenot being properly resolved. throw IngestionPipelineDeploymentException.byMessage(workflow.getName(), TRIGGER_ERROR,No response from the test connection. Make sure your service is reachable and accepting connections);}catch(InterruptedException e){Thread.currentThread().interrupt();throw IngestionPipelineDeploymentException.byMessage(workflow.getName(), TRIGGER_ERROR, e.getMessage());}throw new PipelineServiceClientException(String.format(%s Failed to trigger workflow due to airflow API returned %s and response %s, workflow.getName(), Response.Status.fromStatusCode(response.statusCode()), response.body()));}Airflow 侧的run_automation路由接收请求后调用execute(automation_workflow)File:openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/run_automation.py (L54-85)blueprint.route(/run_automation,methods[POST])csrf.exempt requires_access_decorator([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])def run_automation()-Response: Given a WorkflowSource Schema, create the engine andtestthe connection json_requestrequest.get_json(cacheFalse)try: automation_workflowparse_automation_workflow_gracefully(config_dictjson_request)# we need to instantiate the secret manager in case secrets are passedSecretsManagerFactory(automation_workflow.openMetadataServerConnection.secretsManagerProvider, automation_workflow.openMetadataServerConnection.secretsManagerLoader,)# Should this be triggered async?execute(automation_workflow)returnApiResponse.success({message:fWorkflow [{escape(automation_workflow.name)}] has been triggered.})三、安装 Airflow独立虚拟环境OpenMetadata 1.12 对应的 Airflow 版本File:ingestion/setup.py (L22-23)VERSIONS{airflow:apache-airflow3.1.7,第一步创建 Airflow 专用虚拟环境确保ingestion环境已安装可参考之前的文章。# 与 ingestion 的 env 完全分开python3.11-mvenv ~/airflow-envsource~/airflow-env/bin/activate pip configsetglobal.index-url https://mirrors.aliyun.com/pypi/simple/第二步安装 Airflowpipinstallapache-airflow3.1.7--constrainthttps://raw.githubusercontent.com/apache/airflow/constraints-3.1.7/constraints-3.11.txt如果 constraints 文件不存在URL 404去掉 constraints 直接安装pipinstallapache-airflow3.1.7第三步安装 OpenMetadata Airflow 插件cd~/OpenMetadata/openmetadata-airflow-apispipinstall-e.插件通过[project.entry-points.airflow.plugins]注册到 AirflowFile:openmetadata-airflow-apis/pyproject.toml (L47-48)[project.entry-points.airflow.plugins]openmetadata_managed_apisopenmetadata_managed_apis.plugin:RestApiPlugin第四步初始化 Airflow 数据库如果不使用mysql数据库可跳过该命令默认情况下Airflow 使用 SQLite。cat~/.bashrcEOF export AIRFLOW_HOME~/airflow export DB_SCHEMEmysqlpymysql export DB_USERroot export DB_PASSWORD123456 export DB_HOSTlocalhost export DB_PORT3306 export AIRFLOW_DBairflow_db export AIRFLOW__DATABASE__SQL_ALCHEMY_CONNmysqlpymysql://root:123456localhost:3306/airflow_db EOFsource~/.bashrc# 初始化数据库airflow db migrate四、启动 AirflowAirflow 3.x 启动方式Airflow 3.x 移除了airflow users命令使用standalone命令启动自动创建管理员用户source~/airflow-env/bin/activate# 启动airflowairflow standalone启动时会在日志中打印自动生成的密码standalone|Login with username: admin password:自动生成的密码standalone|Airflow Standalone isfordevelopment purposes only.记录下这个密码后续配置 Java 后端时需要用到。修改默认端口8080 被占用时查看当前配置的端口airflow config get-value api port查看哪个进程占用了 8080sudolsof-i:8080方式一环境变量临时AIRFLOW__API__PORT8090airflow standalone方式二修改**airflow.cfg永久**# 找到配置文件位置echo$AIRFLOW_HOME# 默认是 ~/airflow编辑~/airflow/airflow.cfg[api]port8090五、配置 Java 后端连接 Airflow找到 Java 后端配置文件conf/openmetadata.yaml修改pipelineServiceClientConfiguration部分File:conf/openmetadata.yaml (L544-589)pipelineServiceClientConfiguration: enabled:${PIPELINE_SERVICE_CLIENT_ENABLED:-true}# If we dont need this, set org.openmetadata.service.clients.pipeline.noop.NoopClientclassName:${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient}apiEndpoint:${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://localhost:8080}metadataApiEndpoint:${SERVER_HOST_API_URL:-http://localhost:8585/api}ingestionIpInfoEnabled:${PIPELINE_SERVICE_IP_INFO_ENABLED:-false}hostIp:${PIPELINE_SERVICE_CLIENT_HOST_IP:-}healthCheckInterval:${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}# This SSL information is about the OpenMetadata server.# It will be picked up from the pipelineServiceClient to use/ignore SSL when connecting to the OpenMetadata server.verifySSL:${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-no-ssl}# Possible values are no-ssl, ignore, validatesslConfig: certificatePath:${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-}# Local path for the Pipeline Service ClientlogStorageConfiguration: type:${PIPELINE_SERVICE_CLIENT_LOG_TYPE:-default}# Possible values are default, s3enabled:${PIPELINE_SERVICE_CLIENT_LOG_ENABLED:-false}# Enable it for pipelines deployed in the server# if type is s3, provide the following configurationbucketName:${PIPELINE_SERVICE_CLIENT_LOG_BUCKET_NAME:-}# optional path within the bucket to store the logsprefix:${PIPELINE_SERVICE_CLIENT_LOG_PREFIX:-}enableServerSideEncryption:${PIPELINE_SERVICE_CLIENT_LOG_SSE_ENABLED:-false}sseAlgorithm:${PIPELINE_SERVICE_CLIENT_LOG_SSE_ALGORITHM:-AES256}# Allowed values: AES256 or aws:kmskmsKeyId:${PIPELINE_SERVICE_CLIENT_LOG_KMS_KEY_ID:-}# Required only if sseAlgorithm is aws:kmsawsConfig: enabled:${PIPELINE_SERVICE_CLIENT_AWS_IAM_AUTH_ENABLED:-false}awsAccessKeyId:${PIPELINE_SERVICE_CLIENT_LOG_AWS_ACCESS_KEY_ID:-}awsSecretAccessKey:${PIPELINE_SERVICE_CLIENT_LOG_AWS_SECRET_ACCESS_KEY:-}awsRegion:${PIPELINE_SERVICE_CLIENT_LOG_REGION:-}awsSessionToken:${PIPELINE_SERVICE_CLIENT_LOG_AWS_SESSION_TOKEN:-}endPointURL:${PIPELINE_SERVICE_CLIENT_LOG_AWS_ENDPOINT_URL:-}# port forward localhost:9000 for minio# Secrets Manager Loader: specify to the Ingestion Framework how to load the SM credentials from its env# Supported: noop, airflow, envsecretsManagerLoader:${PIPELINE_SERVICE_CLIENT_SECRETS_MANAGER_LOADER:-noop}# Default required parameters for Airflow as Pipeline Service Clientparameters:## Airflow parametersusername:${AIRFLOW_USERNAME:-admin}password:${AIRFLOW_PASSWORD:-admin}timeout:${AIRFLOW_TIMEOUT:-10}# If we need to use SSL to reach AirflowtruststorePath:${AIRFLOW_TRUST_STORE_PATH:-}truststorePassword:${AIRFLOW_TRUST_STORE_PASSWORD:-}## Kubernetes client parameters开发环境最小配置pipelineServiceClientConfiguration: enabled:trueclassName:org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClientapiEndpoint:http://localhost:8080# Airflow 端口如改了端口则对应修改metadataApiEndpoint:http://localhost:8585/apiparameters: username: admin password:airflow standalone 启动时显示的密码timeout:10修改后重启 Java 后端使配置生效。六、验证 Airflow 连接状态方式一通过 UI 验证进入 OpenMetadata UI →设置→ 服务→ 工作流方式二通过 API 验证# 在 WSL 中测试 Airflow 健康检查接口curl-uadmin:passwordhttp://localhost:8099/pluginsv2/api/v2/openmetadata/health-authJava 后端也会定期调用此接口检查 Airflow 状态File:openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java (L450-509)Override public PipelineServiceClientResponsegetServiceStatusInternal(){HttpResponseStringresponse;try{String healthUrlbuildURI(health-auth).build().toString();responsegetRequestAuthenticatedForJsonContent(healthUrl);// We can reach the APIs and get the status back from Airflowif(response.statusCode()200){JSONObject responseJSONnew JSONObject(response.body());String ingestionVersionresponseJSON.getString(version);returnvalidServerClientVersions(ingestionVersion, SERVER_VERSION)? buildHealthyStatus(ingestionVersion):buildUnhealthyStatus(buildVersionMismatchErrorMessage(ingestionVersion, SERVER_VERSION));}// Auth error when accessing the APIsif(response.statusCode()401||response.statusCode()403){returnbuildUnhealthyStatus(String.format(Authentication failed for user [%s] trying to access the Airflow APIs at [%s], this.username, serviceURL.toString()));}// APIs URL not foundif(response.statusCode()404){returnbuildUnhealthyStatus(String.format(Airflow APIs not found at [%s]. Please validate if the OpenMetadata Airflow plugin is installed correctly. %s, serviceURL.toString(), DOCS_LINK));}returnbuildUnhealthyStatus(String.format(Unexpected status response at [%s]: code [%s] - [%s], serviceURL.toString(), response.statusCode(), response.body()));}catch(IOException|URISyntaxException e){String exceptionMsg;if(e.getMessage()!null){exceptionMsgString.format(Failed to get Airflow status at [%s] due to [%s]., serviceURL.toString(), e.getMessage());}else{exceptionMsgString.format(Failed to connect to Airflow due to %s. Is the host available at %s?, e.getCause().toString(), serviceURL.toString());}returnbuildUnhealthyStatus(String.format(%s %s, exceptionMsg, DOCS_LINK));}catch(InterruptedException e){Thread.currentThread().interrupt();returnbuildUnhealthyStatus(String.format(Failed to connect to Airflow due to %s. Is the host available at %s? %s., e.getMessage(), serviceURL.toString(), DOCS_LINK));}}七、两个虚拟环境的管理虚拟环境路径用途ingestion env~/OpenMetadata/ingestion/envmetadata ingest等命令airflow env~/airflow-envAirflow OpenMetadata 插件每次使用时激活对应的虚拟环境# 使用 ingestionsource~/OpenMetadata/ingestion/env/bin/activate# 使用 Airflowsource~/airflow-env/bin/activate八、常见问题速查错误信息原因解决方案airflow command error: invalid choice: usersAirflow 3.x 移除了users命令直接运行airflow standalone自动创建用户Could not open requirements file: 多行命令反斜杠换行未正确处理把pip install命令写成一行Test Connection 按钮灰色不可点击Airflow 不可用或未配置启动 Airflow 并配置openmetadata.yamlAuthentication failed for user [admin]Airflow 密码配置错误检查openmetadata.yaml中的parameters.passwordAirflow APIs not found at [...]OpenMetadata 插件未安装pip install openmetadata-managed-apis