0 ×

m_​120_​db_​access_​bigdata_​tables

Workflow

An overview of KNIME based functions to access big data systems - use it on your own big data system (including PySpark)
knimesqldbbasedatabasesparkdataPySpark
10 - Use KNIME's Database framework and its nodes 100 - bring your (big) data to the Spark framework 110 - bring data back from Spark to Hive/Impala 120 - Spark does support SQL also - wheather in open code or you can use KNIME's nodes (or combine them) SELECT `prod_productname`, COUNT(DISTINCT `order_id`) AS no_distinct_ordersFROM #table# t1GROUP BY `prod_productname` 20 - You can work with 'open' SQL code (if you know what you are doing :-))or instead simple Knime Nodes DROP TABLE IF EXISTS `$${Sv_big_data_schema}$$`.`tmp_db_order_customer_product_02`;CREATE TABLE `$${Sv_big_data_schema}$$`.`tmp_db_order_customer_product_02`AS SELECT *FROM `$${Sv_big_data_schema}$$`.`tmp_db_order_customer` t1 LEFT JOIN `$${Sv_big_data_schema}$$`.`tmp_product` t2 ON `t1`.`odetail_ProductId` = `t2`.`prod_Id` ; 25 - Knime Nodes & SQL Query - Group by and Having 35 - Knime Nodes & SQL Query - Order by 30 - Knime Nodes & SQL Query - Rename & Column Filter 40 - Knime Nodes & SQL Query - Row Filter & Where 50 - Knime Nodes & SQL Query - Datetimes in SQL 45 - Knime Nodes & SQL Query - Case When 220 - data saved from PySpark node # 230 - simple example of GroupBy resultDataFrame1 = dataFrame1.groupBy('cust_country').sum('odetail_quantity') 210 - use PySpark in open code to import and work with data from your big data system 200 - use PySpark code to work with data imported from Spark connector # 240 - Aggregate with multiple functions from pyspark.sql import functions as F # aggregate data resultDataFrame1 = dataFrame1.groupby('cust_country').agg( F.avg(F.col('odetail_quantity')).alias('quantity_avg'), F.sum(F.col('odetail_quantity')).alias('quantity_sum') ) df2 = dataFrame1.filter((dataFrame1['order_freight']>50) & (dataFrame1['order_freight']<250)).select(['order_freight','order_id','order_shipcity']) df2 = df2.sort('order_freight', ascending=False) # rename columns df2 = df2.withColumnRenamed('order_freight','freight').withColumnRenamed('order_shipcity','ship_city') resultDataFrame1 = df2 resultDataFrame2 = spark.sql("SELECT cust_country, ROUND(AVG(odetail_discount),3) as avg_discount_bycountry, COUNT(*) as no_lines FROM df GROUP BY cust_country ORDER BY no_lines DESC NULLS LAST") df = spark.sql("SELECT cust_city, COUNT(DISTINCT order_id) as no_single_order_id FROM ocean_fs_prod.tmp_db_order_customer_product_01 GROUP BY cust_cityORDER BY no_single_order_id DESC") # construct a name for the table to be stored var_databse_to_save_to = flow_variables['v_v_big_data_schema'] + ".tmp_result_orders_by_city" # https://kontext.tech/column/spark/294/spark-save-dataframe-to-hive-table # you could also append the data df.write.mode("overwrite").saveAsTable(var_databse_to_save_to) resultDataFrame1 = spark.sql("SELECT * FROM ocean_fs_prod.tmp_db_order_customer_product_01") An overview of KNIME based functions to access big data systems - use it on your own big data system (including PySpark)Use SQL with Impala/Hive and Spark and also PySpark to access and manipulate data on a big data system. The example is from the classic MS "Northwind" database. enter yourImpalaHDFSSparkconnectionCustomerOrderOrderDetailAdd CustomerInformationAdd Order Detailstmp_db_order_customerremovetmp_db_order_customerif it is thereProductcombine Product and CustomerInformationtmp_db_order_customer_product_02=> query in open SQL code(you will have to know what you are doing)removetmp_db_order_customer_product_01if it is theretmp_db_order_customer_product_0211 - direct query and results toKNIMEgroup byShipRegionFun with date anddatettimes in HivePySpark codefrom Spark connectionsimple example of GroupBy access file on BigData with PySparkdb_order_customer_product_01save result of calculation as table"tmp_result_orders_by_city"fetch *all* lines?!fetch *all* lines?!Aggregate with multiple functionsDiscountsfetch *all* lines?!some housekeepingdestroy your SparkContext at the endFreights between 50 and 250ortdered by Cityfetch *all* lines?!tmp_result_orders_by_citytmp_result_orders_by_citytmp_result_orders_by_city105 - keep the result inmemory if it has been run oncefor further analysisapply categoricalencoderfetch *all*lines!^(?!no_distinct_orders).*$group by ShipRegionin SQL codeorder_id => bestell_idfiltercolumnsShipRegion like '%Europe%'order by order_id => bestell_idwhereShipRegion like '%Europe%'group by & havingoverwrite tmp_db_order_customer_03without productcase when ShipRegionappend tmp_db_order_customer_03without productv_big_data_schemaProduct=> to SparkCustomer=> to sparktmp_db_order_customer_product_01your bigdata connectenvironment DB Table Selector DB Table Selector DB Table Selector DB Joiner DB Joiner DB ConnectionTable Writer DB Table Remover DB Table Selector Spark Joiner DB SQL Executor DB Table Remover DB Table Selector DB Reader DB Query Reader DB Reader DB Sorter DB GroupBy DB Reader DB Query Reader PySpark Script(1 to 2) PySpark Script(1 to 1) PySparkScript Source Spark to Table Spark to Table PySpark Script(1 to 1) Spark to Table Destroy SparkContext Spark to Table DB Table Selector DB SQL Executor DB Reader Persist SparkDataFrame/RDD Spark SQL Query Spark CategoryTo Number Spark TransformationsApplier Spark to Table Spark Column Filter DB Query DB Reader DB Column Rename DB Column Filter DB Row Filter DB Query DB Reader DB Reader DB Query DB Reader DB Query DB Query DB Reader DB ConnectionTable Writer DB Column Filter DB Reader DB Query DB Reader DB Reader DB ConnectionTable Writer DB Column Filter String Input Impala to Spark Impala to Spark Spark to Impala Transpose 10 - Use KNIME's Database framework and its nodes 100 - bring your (big) data to the Spark framework 110 - bring data back from Spark to Hive/Impala 120 - Spark does support SQL also - wheather in open code or you can use KNIME's nodes (or combine them) SELECT `prod_productname`, COUNT(DISTINCT `order_id`) AS no_distinct_ordersFROM #table# t1GROUP BY `prod_productname` 20 - You can work with 'open' SQL code (if you know what you are doing :-))or instead simple Knime Nodes DROP TABLE IF EXISTS `$${Sv_big_data_schema}$$`.`tmp_db_order_customer_product_02`;CREATE TABLE `$${Sv_big_data_schema}$$`.`tmp_db_order_customer_product_02`AS SELECT *FROM `$${Sv_big_data_schema}$$`.`tmp_db_order_customer` t1 LEFT JOIN `$${Sv_big_data_schema}$$`.`tmp_product` t2 ON `t1`.`odetail_ProductId` = `t2`.`prod_Id` ; 25 - Knime Nodes & SQL Query - Group by and Having 35 - Knime Nodes & SQL Query - Order by 30 - Knime Nodes & SQL Query - Rename & Column Filter 40 - Knime Nodes & SQL Query - Row Filter & Where 50 - Knime Nodes & SQL Query - Datetimes in SQL 45 - Knime Nodes & SQL Query - Case When 220 - data saved from PySpark node # 230 - simple example of GroupBy resultDataFrame1 = dataFrame1.groupBy('cust_country').sum('odetail_quantity') 210 - use PySpark in open code to import and work with data from your big data system 200 - use PySpark code to work with data imported from Spark connector # 240 - Aggregate with multiple functions from pyspark.sql import functions as F # aggregate data resultDataFrame1 = dataFrame1.groupby('cust_country').agg( F.avg(F.col('odetail_quantity')).alias('quantity_avg'), F.sum(F.col('odetail_quantity')).alias('quantity_sum') ) df2 = dataFrame1.filter((dataFrame1['order_freight']>50) & (dataFrame1['order_freight']<250)).select(['order_freight','order_id','order_shipcity']) df2 = df2.sort('order_freight', ascending=False) # rename columns df2 = df2.withColumnRenamed('order_freight','freight').withColumnRenamed('order_shipcity','ship_city') resultDataFrame1 = df2 resultDataFrame2 = spark.sql("SELECT cust_country, ROUND(AVG(odetail_discount),3) as avg_discount_bycountry, COUNT(*) as no_lines FROM df GROUP BY cust_country ORDER BY no_lines DESC NULLS LAST") df = spark.sql("SELECT cust_city, COUNT(DISTINCT order_id) as no_single_order_id FROM ocean_fs_prod.tmp_db_order_customer_product_01 GROUP BY cust_cityORDER BY no_single_order_id DESC") # construct a name for the table to be stored var_databse_to_save_to = flow_variables['v_v_big_data_schema'] + ".tmp_result_orders_by_city" # https://kontext.tech/column/spark/294/spark-save-dataframe-to-hive-table # you could also append the data df.write.mode("overwrite").saveAsTable(var_databse_to_save_to) resultDataFrame1 = spark.sql("SELECT * FROM ocean_fs_prod.tmp_db_order_customer_product_01") An overview of KNIME based functions to access big data systems - use it on your own big data system (including PySpark)Use SQL with Impala/Hive and Spark and also PySpark to access and manipulate data on a big data system. The example is from the classic MS "Northwind" database. enter yourImpalaHDFSSparkconnectionCustomerOrderOrderDetailAdd CustomerInformationAdd Order Detailstmp_db_order_customerremovetmp_db_order_customerif it is thereProductcombine Product and CustomerInformationtmp_db_order_customer_product_02=> query in open SQL code(you will have to know what you are doing)removetmp_db_order_customer_product_01if it is theretmp_db_order_customer_product_0211 - direct query and results toKNIMEgroup byShipRegionFun with date anddatettimes in HivePySpark codefrom Spark connectionsimple example of GroupBy access file on BigData with PySparkdb_order_customer_product_01save result of calculation as table"tmp_result_orders_by_city"fetch *all* lines?!fetch *all* lines?!Aggregate with multiple functionsDiscountsfetch *all* lines?!some housekeepingdestroy your SparkContext at the endFreights between 50 and 250ortdered by Cityfetch *all* lines?!tmp_result_orders_by_citytmp_result_orders_by_citytmp_result_orders_by_city105 - keep the result inmemory if it has been run oncefor further analysisapply categoricalencoderfetch *all*lines!^(?!no_distinct_orders).*$group by ShipRegionin SQL codeorder_id => bestell_idfiltercolumnsShipRegion like '%Europe%'order by order_id => bestell_idwhereShipRegion like '%Europe%'group by & havingoverwrite tmp_db_order_customer_03without productcase when ShipRegionappend tmp_db_order_customer_03without productv_big_data_schemaProduct=> to SparkCustomer=> to sparktmp_db_order_customer_product_01your bigdata connectenvironment DB Table Selector DB Table Selector DB Table Selector DB Joiner DB Joiner DB ConnectionTable Writer DB Table Remover DB Table Selector Spark Joiner DB SQL Executor DB Table Remover DB Table Selector DB Reader DB Query Reader DB Reader DB Sorter DB GroupBy DB Reader DB Query Reader PySpark Script(1 to 2) PySpark Script(1 to 1) PySparkScript Source Spark to Table Spark to Table PySpark Script(1 to 1) Spark to Table Destroy SparkContext Spark to Table DB Table Selector DB SQL Executor DB Reader Persist SparkDataFrame/RDD Spark SQL Query Spark CategoryTo Number Spark TransformationsApplier Spark to Table Spark Column Filter DB Query DB Reader DB Column Rename DB Column Filter DB Row Filter DB Query DB Reader DB Reader DB Query DB Reader DB Query DB Query DB Reader DB ConnectionTable Writer DB Column Filter DB Reader DB Query DB Reader DB Reader DB ConnectionTable Writer DB Column Filter String Input Impala to Spark Impala to Spark Spark to Impala Transpose

Download

Get this workflow from the following link: Download

Nodes

m_​120_​db_​access_​bigdata_​tables consists of the following 66 nodes(s):

Plugins

m_​120_​db_​access_​bigdata_​tables contains nodes provided by the following 6 plugin(s):