Icon

kn_​example_​bigdata_​hive_​self_​healing_​jobs

Hive/Big Data - a simple "self-healing" (automated) ETL or analytics system on a big data cluster
3. "self-healing" daily job to do the analysis (check if the taks for the day is done, if yes: sleep, if no, do the job) 2. CREATE the (empty) table to store the reporting "default.db_analytics"the structure of the table is derived from the structure of a table created with the KNIME DB nodes (it will write the SQL for you) YOUR analysis or ETL jobinitially do a sample of you analysis once (in a real life scenario this table might already exist) YOUR analysis or ETL job(this would be executed once a day) 1. CREATE a basic table "default.db_main_table" (in a real life scenario this table might already exist)also demonstrates the UPLOAD of data into partitions on a big data cluster. The partition variables are always the last columns Hive/Big Data - a simple "self-healing" (automated) ETL or analytics system on a big data clusterThe scenario: you have a table on a big data system with daily data ("default.db_main_table") partitioned by d_date and you want to do some reporting (the number of lines per day) stored in a new table "default.db_analytics"The system is a big data system and you want to do this with partitioned Hive tablesThe main workflow should be able to be run several times per day and then do the report if it has not been done yet. If a day is missing it will do the job again until it is finished. You could do this by hand or schedule that on the KNIME server. -- additional options when creating the tablePARTITIONED BY (d_date STRING) COMMENT 'this is the MAIN TABLE in my big data system' STORED AS PARQUETTBLPROPERTIES ( 'parquet.compression'='snappy' , 'external.table.purge'='true' , 'transactional'='false' , 'discover.partitions' = 'false' , 'table.owner' = 'John Doe' , 'table.owner_team' = 'Data Analytics' , 'table.housekeeping'='{"info": "housekeeping by daily partition" , "attribute": "d_date" , "attribute_string_format": "yyyy-MM-dd" }' ) let KNIME write the SQL code to CREATE the big data table DROP TABLE IF EXISTS $${Sv_big_data_schema}$$.db_analytics_temp;DROP TABLE IF EXISTS $${Sv_big_data_schema}$$.db_analytics_temp;CREATE TABLE $${Sv_big_data_schema}$$.db_analytics_temp/* STORED AS PARQUET */STORED AS ORCCOMMENT 'this is just a temporary table' TBLPROPERTIES ( 'parquet.compression'='snappy' , 'external.table.purge'='true' , 'transactional'='false' , 'discover.partitions' = 'false' , 'table.owner' = 'John Doe' , 'table.owner_team' = 'Data Analytics' , 'table.housekeeping'='{ "info": "deletion date defined" , "deletion_date": "$${Sstart_date}$$" }' ) AS $${Ssql_001}$$; CREATE TABLE IF NOT EXISTS $${Sv_big_data_schema}$$.db_analytics ($${Sv_sql_string2}$$ )PARTITIONED BY (d_date STRING) COMMENT 'this is the analytics table in my big data system' STORED AS PARQUETTBLPROPERTIES ('parquet.compression'='snappy' , 'external.table.purge'='true' , 'transactional'='false' , 'discover.partitions' = 'false' , 'table.owner' = 'John Doe' , 'table.owner_team' = 'Data Analytics' , 'table.housekeeping'='{"info": "housekeeping by daily partition" , "attribute": "d_date" , "attribute_string_format": "yyyy-MM-dd" }' ); 4. Housekeeping - delet old partitions ALTER TABLE $${Sv_big_data_schema}$$.db_analytics DROP IF EXISTS PARTITION ($${Spartition_name}$$ = "$${Spartition_date}$$"); /* on Impala it would look like this */ ALTER TABLE $${Sv_big_data_schema}$$.db_analytics DROP IF EXISTS PARTITION ($${Spartition_name}$$ < to_date(date_sub(now(),90))); db_main_tableyou can replace thiswith your big dataconnection/big_data//data/=> will clear big_datafolderload datainto hivetable=> partition columns on the last placedb_main_tablestart_date=> Date formatstart_timeSHOW CREATE TABLE $${Sv_big_data_schema}$$.`db_main_table`show how the table was createdALTER TABLE $${Sv_big_data_schema}$$.db_analytics DROP IF EXISTS partition (d_date = "$${Sd_date}$$");optional: if you want to test an singleentry you coulddo that hereSHOW PARTITIONS $${Sv_big_data_schema}$$.`db_analytics` DESCRIBE EXTENDED $${Sv_big_data_schema}$$.`db_main_table`get informations about big data tableDESCISIONto run or not to rundetermine which dayshave to be processedoption -> only last 5 daysINSERT /* +SHUFFLE */ INTOdb_analyticsDROP temporary tabledb_analytics_tempfilter bycurrent d_dateCountCluster_Membership (String)insert the result intodb_analytics_tempnumber_ofcreate temporary tabledb_analytics_tempget just the structureWHERE 1=2sql_001 $Column Type$ LIKE "String*" =>"STRING"$Column Type$ LIKE "*long*" =>"BIGINT"$Column Type$ LIKE "*double*" =>"DOUBLE"$Column Type$ LIKE "*integer*" =>"INTEGER"$Column Type$ LIKE "*Time*" =>"TIMESTAMP"$Column Type$ LIKE "*Date*" =>"DATE"=> you might have to extend that if you havetimestamps or BIGINT columns or unknown typesCREATE IF NOT EXISTSdb_analyticsPARTITIONED BY (d_date STRING)v_big_data_schema=> the name of yout big data schemav_sql_string2_castv_sql_string21st cretae (empty)hive tabledb_main_table=> partition columns on the last placesearch for the upload folder on the local big data system../big_data on MacOS and Linux..\big_data on Windows?createdummydataCluster_Membershiptimestamp_real_txtCREATEmain databaseSTART9 days back fromtoday40% randomlyd_dated_dated_dateminus_daysd_date^(?!minus_days).*$CREATEmain databaseSTOPdb_main_tabledb_main_tableSTARTBig Loopdb_main_tablefilter bycurrent d_dateCountCluster_Membership (String)number_ofget just the structureloop endTRYStartTRYStopdb_main_tabledelete a singlePARTITIONDESCRIBE EXTENDED $${Sv_big_data_schema}$$.`db_analytics`get informations about big data tableSHOW CREATE TABLE $${Sv_big_data_schema}$$.`db_analytics`show how the table was createdSHOW databasesSHOW tables IN $${Sv_big_data_schema}$$SHOW CREATE TABLE $${Sv_big_data_schema}$$.`db_analytics_temp`show how the table was createdoptionaloptionalSHOW PARTITIONS $${Sv_big_data_schema}$$.`db_analytics` => before housekeepingDROP temporary tabledb_analytics_temp=> older versions ofHive do notaccept a >< operatorto delet partitionspartition_namepartition_datepartition_date_ddays_oldfilter partitions which are older or equal thanv_days_old_partitionv_days_old_partitionSTARTHousekeepingSHOW PARTITIONS $${Sv_big_data_schema}$$.`db_analytics`=> after housekeepingENDHousekeepingTRYStartTRYStopDB Table Remover Create Local BigData Environment Metadata forBig Data DB Loader DB Table Selector Java EditVariable (simple) Java EditVariable (simple) DB Query Reader DB SQL Executor Rule-basedRow Filter DB Query Reader DB Query Reader Empty Table Switch Metanode DB SQL Executor DB SQL Executor DB Query DB GroupBy DB ConnectionTable Writer DB Column Rename DB SQL Executor DB Query DB Query Extractor Extract Table Spec Rule Engine DB SQL Executor String Input Merge Variables DB Table Creator determineupload path Data Generator Column Rename create timestamp Counting Loop Start Partitioning ConstantValue Column String to Date&Time Date&Time Shift Math Formula Date&Time to String Column Filter Variable Loop End DB GroupBy Metanode SQLstrings DB Reader Merge Variables DB Table Selector DB ConnectionExtractor Table Row ToVariable Loop Start DB Table Selector DB Query DB GroupBy DB Column Rename DB Query Variable Loop End DB ConnectionExtractor Cache Try (VariablePorts) Catch Errors(Var Ports) Merge Variables Sorter DB Reader DB SQL Executor DB ConnectionExtractor DB Query Reader DB Query Reader DB Query Reader DB Query Reader DB Query Reader DB ConnectionExtractor DB Query Reader Sorter DB SQL Executor Cell Splitter Column Rename String to Date&Time Date&TimeDifference Rule-basedRow Filter Integer Input Table Row ToVariable Loop Start DB Query Reader Variable Loop End Try (VariablePorts) Catch Errors(Var Ports) Merge Variables 3. "self-healing" daily job to do the analysis (check if the taks for the day is done, if yes: sleep, if no, do the job) 2. CREATE the (empty) table to store the reporting "default.db_analytics"the structure of the table is derived from the structure of a table created with the KNIME DB nodes (it will write the SQL for you) YOUR analysis or ETL jobinitially do a sample of you analysis once (in a real life scenario this table might already exist) YOUR analysis or ETL job(this would be executed once a day) 1. CREATE a basic table "default.db_main_table" (in a real life scenario this table might already exist)also demonstrates the UPLOAD of data into partitions on a big data cluster. The partition variables are always the last columns Hive/Big Data - a simple "self-healing" (automated) ETL or analytics system on a big data clusterThe scenario: you have a table on a big data system with daily data ("default.db_main_table") partitioned by d_date and you want to do some reporting (the number of lines per day) stored in a new table "default.db_analytics"The system is a big data system and you want to do this with partitioned Hive tablesThe main workflow should be able to be run several times per day and then do the report if it has not been done yet. If a day is missing it will do the job again until it is finished. You could do this by hand or schedule that on the KNIME server. -- additional options when creating the tablePARTITIONED BY (d_date STRING) COMMENT 'this is the MAIN TABLE in my big data system' STORED AS PARQUETTBLPROPERTIES ( 'parquet.compression'='snappy' , 'external.table.purge'='true' , 'transactional'='false' , 'discover.partitions' = 'false' , 'table.owner' = 'John Doe' , 'table.owner_team' = 'Data Analytics' , 'table.housekeeping'='{"info": "housekeeping by daily partition" , "attribute": "d_date" , "attribute_string_format": "yyyy-MM-dd" }' ) let KNIME write the SQL code to CREATE the big data table DROP TABLE IF EXISTS $${Sv_big_data_schema}$$.db_analytics_temp;DROP TABLE IF EXISTS $${Sv_big_data_schema}$$.db_analytics_temp;CREATE TABLE $${Sv_big_data_schema}$$.db_analytics_temp/* STORED AS PARQUET */STORED AS ORCCOMMENT 'this is just a temporary table' TBLPROPERTIES ( 'parquet.compression'='snappy' , 'external.table.purge'='true' , 'transactional'='false' , 'discover.partitions' = 'false' , 'table.owner' = 'John Doe' , 'table.owner_team' = 'Data Analytics' , 'table.housekeeping'='{ "info": "deletion date defined" , "deletion_date": "$${Sstart_date}$$" }' ) AS $${Ssql_001}$$; CREATE TABLE IF NOT EXISTS $${Sv_big_data_schema}$$.db_analytics ($${Sv_sql_string2}$$ )PARTITIONED BY (d_date STRING) COMMENT 'this is the analytics table in my big data system' STORED AS PARQUETTBLPROPERTIES ('parquet.compression'='snappy' , 'external.table.purge'='true' , 'transactional'='false' , 'discover.partitions' = 'false' , 'table.owner' = 'John Doe' , 'table.owner_team' = 'Data Analytics' , 'table.housekeeping'='{"info": "housekeeping by daily partition" , "attribute": "d_date" , "attribute_string_format": "yyyy-MM-dd" }' ); 4. Housekeeping - delet old partitions ALTER TABLE $${Sv_big_data_schema}$$.db_analytics DROP IF EXISTS PARTITION ($${Spartition_name}$$ = "$${Spartition_date}$$"); /* on Impala it would look like this */ ALTER TABLE $${Sv_big_data_schema}$$.db_analytics DROP IF EXISTS PARTITION ($${Spartition_name}$$ < to_date(date_sub(now(),90))); db_main_tableyou can replace thiswith your big dataconnection/big_data//data/=> will clear big_datafolderload datainto hivetable=> partition columns on the last placedb_main_tablestart_date=> Date formatstart_timeSHOW CREATE TABLE $${Sv_big_data_schema}$$.`db_main_table`show how the table was createdALTER TABLE $${Sv_big_data_schema}$$.db_analytics DROP IF EXISTS partition (d_date = "$${Sd_date}$$");optional: if you want to test an singleentry you coulddo that hereSHOW PARTITIONS $${Sv_big_data_schema}$$.`db_analytics` DESCRIBE EXTENDED $${Sv_big_data_schema}$$.`db_main_table`get informations about big data tableDESCISIONto run or not to rundetermine which dayshave to be processedoption -> only last 5 daysINSERT /* +SHUFFLE */ INTOdb_analyticsDROP temporary tabledb_analytics_tempfilter bycurrent d_dateCountCluster_Membership (String)insert the result intodb_analytics_tempnumber_ofcreate temporary tabledb_analytics_tempget just the structureWHERE 1=2sql_001 $Column Type$ LIKE "String*" =>"STRING"$Column Type$ LIKE "*long*" =>"BIGINT"$Column Type$ LIKE "*double*" =>"DOUBLE"$Column Type$ LIKE "*integer*" =>"INTEGER"$Column Type$ LIKE "*Time*" =>"TIMESTAMP"$Column Type$ LIKE "*Date*" =>"DATE"=> you might have to extend that if you havetimestamps or BIGINT columns or unknown typesCREATE IF NOT EXISTSdb_analyticsPARTITIONED BY (d_date STRING)v_big_data_schema=> the name of yout big data schemav_sql_string2_castv_sql_string21st cretae (empty)hive tabledb_main_table=> partition columns on the last placesearch for the upload folder on the local big data system../big_data on MacOS and Linux..\big_data on Windows?createdummydataCluster_Membershiptimestamp_real_txtCREATEmain databaseSTART9 days back fromtoday40% randomlyd_dated_dated_dateminus_daysd_date^(?!minus_days).*$CREATEmain databaseSTOPdb_main_tabledb_main_tableSTARTBig Loopdb_main_tablefilter bycurrent d_dateCountCluster_Membership (String)number_ofget just the structureloop endTRYStartTRYStopdb_main_tabledelete a singlePARTITIONDESCRIBE EXTENDED $${Sv_big_data_schema}$$.`db_analytics`get informations about big data tableSHOW CREATE TABLE $${Sv_big_data_schema}$$.`db_analytics`show how the table was createdSHOW databasesSHOW tables IN $${Sv_big_data_schema}$$SHOW CREATE TABLE $${Sv_big_data_schema}$$.`db_analytics_temp`show how the table was createdoptionaloptionalSHOW PARTITIONS $${Sv_big_data_schema}$$.`db_analytics` => before housekeepingDROP temporary tabledb_analytics_temp=> older versions ofHive do notaccept a >< operatorto delet partitionspartition_namepartition_datepartition_date_ddays_oldfilter partitions which are older or equal thanv_days_old_partitionv_days_old_partitionSTARTHousekeepingSHOW PARTITIONS $${Sv_big_data_schema}$$.`db_analytics`=> after housekeepingENDHousekeepingTRYStartTRYStopDB Table Remover Create Local BigData Environment Metadata forBig Data DB Loader DB Table Selector Java EditVariable (simple) Java EditVariable (simple) DB Query Reader DB SQL Executor Rule-basedRow Filter DB Query Reader DB Query Reader Empty Table Switch Metanode DB SQL Executor DB SQL Executor DB Query DB GroupBy DB ConnectionTable Writer DB Column Rename DB SQL Executor DB Query DB Query Extractor Extract Table Spec Rule Engine DB SQL Executor String Input Merge Variables DB Table Creator determineupload path Data Generator Column Rename create timestamp Counting Loop Start Partitioning ConstantValue Column String to Date&Time Date&Time Shift Math Formula Date&Time to String Column Filter Variable Loop End DB GroupBy Metanode SQLstrings DB Reader Merge Variables DB Table Selector DB ConnectionExtractor Table Row ToVariable Loop Start DB Table Selector DB Query DB GroupBy DB Column Rename DB Query Variable Loop End DB ConnectionExtractor Cache Try (VariablePorts) Catch Errors(Var Ports) Merge Variables Sorter DB Reader DB SQL Executor DB ConnectionExtractor DB Query Reader DB Query Reader DB Query Reader DB Query Reader DB Query Reader DB ConnectionExtractor DB Query Reader Sorter DB SQL Executor Cell Splitter Column Rename String to Date&Time Date&TimeDifference Rule-basedRow Filter Integer Input Table Row ToVariable Loop Start DB Query Reader Variable Loop End Try (VariablePorts) Catch Errors(Var Ports) Merge Variables

Nodes

Extensions

Links