還是筆記本運行3小時。下麵是代碼,不停地運行並附加誤差的圖像
進口json我= 0 df =火花。sql (f”“從sap_ingest_選擇不同batch_id {env}。i2726_batch_control_delta_ns_tbl raw_status =“奔跑”和年* * 100 + 10000 +月日之間年(date_sub(現在(),{dayn})) * 10000 +月(date_sub(現在(),{dayn})) * 100 +天(date_sub(現在(),{dayn}))和((())* 10000年+月(現在())* 100 +天(現在()))”“”)my_list = [int (row.batch_id)行df.collect())當我< len (my_list): # df_check =火花。sql (f“select * from editrack_ingest_ {env}。i1001_batch_control_ns_tbl batch_id = {my_list[我]}”)# df_check.show() #打印(my_list[我])#我= + 1 df_check =火花。sql (f“select count(*)從sap_ingest_ {env}。i2726_po_sto_ns_tbl batch_id = {my_list[我]}和年* 10000 +月* 100 +一天一年之間(date_sub(現在(),{dayn})) * 10000 +月(date_sub(現在(),{dayn})) * 100 +天(date_sub(現在(),{dayn}))和((())* 10000年+月(現在())* 100 +天(現在()))“‘)batchid_check = df_check.take(1)[0][0]打印(batchid_check)如果batchid_check = = 0:火花。sql (f“插入表sap_ingest_ {env}。i2726_po_sto_ns_tbl分區(年、月、日)選擇y。batchRecordCount y。correlationId y。createdTimestamp y。envelopeVersion y.interfaceId y。sequenceId y.interfaceName y。消息id, y。etag, y。payloadKey, y.payloadName, y.payloadVersion, y.securityClassification, y.sourceApplicationName, c.businessUnit, c.countryOfOrigin , c.fobDate, c.forwardingAgentDesc, c.forwardingAgentId, c.forwardingAgentDelPt, c.incoTerms1, c.incoTerms2 , c.initialOrderDate , c.initialOrderRef , c.invoicingPartyId, c.orderingPartyId, c.rtmPartyId, c.reprocessorPartyId, m.boxHangingInd, m.changeIndicator, m.deliveryTime, d.amount , d.currency , d.type , p.custOrderNumber, p.outloadDate, p.collection_date, p.celebration_date, p.cust_message, m.legacyProductId , m.lineItemId , m.netweight , m.orderUnit , m.productId , m.quantityPerOrderUnit , m.recStorageLocation , m.season, m.totalQuantity , m.ultRecSapCode , m.ultimateReceivingLocationId , m.unitsOfWeight , m.contractID, m.contractLineItem, m.dispatchPriority, m.updateReason, m.knittedWoven, m.year, c.msgType , c.orderNumber , c.orderTypeSAP, c.orderType , c.portOfLoading, c.purchDept , c.purchaseOrderType , c.recSapCode , c.receivingLocationId , c.receivingLocationType , c.requestedDeliveryDate , c.sendSapCode, c.sendingLocationId , c.sendingLocationType , c.shippingMethod , c.supplierFactoryDesc , c.supplierFactoryId , c.timestamp, k.amount, k.currency, k.type, x.load_timestamp, x.batch_id , x.year , x.month , x.day from sap_ingest_{env}.i2726_complex_ns_tbl as x lateral view outer explode ((array(x.header))) explode_product as y lateral view outer explode ((array(x.payload))) explode_product as c lateral view outer explode(((c.totalPOValue))) explode_product as k lateral view outer explode ((c.lineItems)) explode_product as po lateral view outer explode ((po.itemValue)) explode_product as d lateral view outer explode((x.payload.lineItems)) explode_product as m lateral view outer explode((po.customer_order)) explode_product as p where x.batch_id = "{my_list[i]}" and x.year*10000+x.month*100+x.day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND (year(now())*10000 + month(now())*100+ day(now())) ''') else: print('Data already loaded for this batch') print(my_list[i]) i=i+1