//Indexer.cpp —— int main ( int argc, char ** argv )
这一部分主要是命令行下参数的读入,我们这里采用的是Indexer.exe -c d:/csft_mysql.conf -All命令,这里,argc = 4,即有3个参数,argv[1] :-c就是读取指定config文档,argv[2]: d: / csft_mysql.conf就配置文件config的位置,argv[3]就代表对config文件中指定的所有数据源进行索引构建
//Do argv[i] 解析命令行 const char * sOptConfig = NULL; bool bMerge = false; CSphVectordMergeDstFilters; CSphVector dIndexes; bool bIndexAll = false; bool bMergeKillLists = false; int i; for ( i=1; i 1 ) { fprintf ( stdout, "ERROR: malformed or unknown option near '%s'.\n", argv[i] ); } else { fprintf ( stdout, "Usage: indexer [OPTIONS] [indexname1 [indexname2 [...]]]\n" "\n" "Options are:\n" "--config \t\tread configuration from specified file\n" "\t\t\t(default is csft.conf)\n" "--all\t\t\treindex all configured indexes\n" "--quiet\t\t\tbe quiet, only print errors\n" "--noprogress\t\tdo not display progress\n" "\t\t\t(automatically on if output is not to a tty)\n" #if !USE_WINDOWS "--rotate\t\tsend SIGHUP to searchd when indexing is over\n" "\t\t\tto rotate updated indexes automatically\n" #endif "--buildstops \n" "\t\t\tbuild top N stopwords and write them to given file\n" "--buildfreqs\t\tstore words frequencies to output.txt\n" "\t\t\t(used with --buildstops only)\n" "--merge \n" "\t\t\tmerge 'src-index' into 'dst-index'\n" "\t\t\t'dst-index' will receive merge result\n" "\t\t\t'src-index' will not be modified\n" "--merge-dst-range \n" "\t\t\tfilter 'dst-index' on merge, keep only those documents\n" "\t\t\twhere 'attr' is between 'min' and 'max' (inclusive)\n" "--merge-killlists" "\t\t\tmerge src and dst killlists instead of applying src killlist to dst" "\n" "Examples:\n" "indexer --quiet myidx1\treindex 'myidx1' defined in 'csft.conf'\n" "indexer --all\t\treindex all indexes defined in 'csft.conf'\n" ); } return 1; }
class CSphConfigParser { public: CSphConfig m_tConf; public: CSphConfigParser (); bool Parse ( const char * sFileName, const char * pBuffer = NULL ); protected: CSphString m_sFileName; int m_iLine; CSphString m_sSectionType; CSphString m_sSectionName; char m_sError [ 1024 ]; int m_iWarnings; static const int wARNS_THRESH = 5; protected: bool IsPlainSection ( const char * sKey ); bool IsNamedSection ( const char * sKey ); bool AddSection ( const char * sType, const char * sSection ); void AddKey ( const char * sKey, char * sValue ); bool ValidateKey ( const char * sKey ); #if !USE_WINDOWS bool TryToExec ( char * pBuffer, char * pEnd, const char * szFilename, CSphVector& dResult ); #endif char * GetBufferString ( char * szDest, int iMax, const char * & szSource ); };
/// config (hash of section types) typedef SmallStringHash_T < CSphConfigType > CSphConfig;
//Indexer.cpp —— int main ( int argc, char ** argv )
/////////////// // load config /////////////// CSphConfigParser cp; CSphConfig & hConf = cp.m_tConf; sOptConfig = sphLoadConfig ( sOptConfig, g_bQuiet, cp ); if ( !hConf ( "source" ) ) sphDie ( "no indexes found in config file '%s'", sOptConfig ); g_iMemLimit = 0; if ( hConf("indexer") && hConf["indexer"]("indexer") ) { CSphConfigSection & hIndexer = hConf["indexer"]["indexer"]; g_iMemLimit = hIndexer.GetSize ( "mem_limit", 0 ); g_iMaxXmlpipe2Field = hIndexer.GetSize ( "max_xmlpipe2_field", 2*1024*1024 ); g_iWriteBuffer = hIndexer.GetSize ( "write_buffer", 1024*1024 ); sphSetThrottling ( hIndexer.GetInt ( "max_iops", 0 ), hIndexer.GetSize ( "max_iosize", 0 ) ); }
const char * sphLoadConfig ( const char * sOptConfig, bool bQuiet, CSphConfigParser & cp ) { // fallback to defaults if there was no explicit config specified while ( !sOptConfig ) { #ifdef SYSCONFDIR sOptConfig = SYSCONFDIR "/csft.conf"; if ( sphIsReadable(sOptConfig) ) break; #endif sOptConfig = "./csft.conf"; if ( sphIsReadable(sOptConfig) ) break; sOptConfig = NULL; break; } if ( !sOptConfig ) sphDie ( "no readable config file (looked in " #ifdef SYSCONFDIR SYSCONFDIR "/csft.conf, " #endif "./csft.conf)" ); if ( !bQuiet ) fprintf ( stdout, "using config file '%s'...\n", sOptConfig ); // load config if ( !cp.Parse ( sOptConfig ) )//Parser为实际解析函数 sphDie ( "failed to parse config file '%s'", sOptConfig ); CSphConfig & hConf = cp.m_tConf; if ( !hConf ( "index" ) ) sphDie ( "no indexes found in config file '%s'", sOptConfig ); return sOptConfig; }
//Indexer.cpp —— int main ( int argc, char ** argv )
///////////////////// // index each index //////////////////// sphStartIOStats (); bool bIndexedOk = false; // if any of the indexes are ok if ( bMerge ) { if ( dIndexes.GetLength()!=2 ) sphDie ( "there must be 2 indexes to merge specified" ); if ( !hConf["index"](dIndexes[0]) ) sphDie ( "no merge destination index '%s'", dIndexes[0] ); if ( !hConf["index"](dIndexes[1]) ) sphDie ( "no merge source index '%s'", dIndexes[1] ); bIndexedOk = DoMerge ( hConf["index"][dIndexes[0]], dIndexes[0], hConf["index"][dIndexes[1]], dIndexes[1], dMergeDstFilters, g_bRotate, bMergeKillLists ); } else if ( bIndexAll ) { hConf["index"].IterateStart (); while ( hConf["index"].IterateNext() ) bIndexedOk |= DoIndex ( hConf["index"].IterateGet (), hConf["index"].IterateGetKey().cstr(), hConf["source"] );//在这里构建索引,核心函数为DoIndex } else { ARRAY_FOREACH ( i, dIndexes ) { if ( !hConf["index"](dIndexes[i]) ) fprintf ( stdout, "WARNING: no such index '%s', skipping.\n", dIndexes[i] ); else bIndexedOk |= DoIndex ( hConf["index"][dIndexes[i]], dIndexes[i], hConf["source"] ); } } sphShutdownWordforms (); const CSphIOStats & tStats = sphStopIOStats (); if ( !g_bQuiet ) { ReportIOStats ( "reads", tStats.m_iReadOps, tStats.m_iReadTime, tStats.m_iReadBytes ); ReportIOStats ( "writes", tStats.m_iWriteOps, tStats.m_iWriteTime, tStats.m_iWriteBytes ); }
//Indexer.cpp ——DoIndex(const CSphConfigSection & hIndex, const char * sIndexName, const CSphConfigType & hSource)
if ( hIndex("type") && hIndex["type"]=="distributed" ) { if ( !g_bQuiet ) { fprintf ( stdout, "distributed index '%s' can not be directly indexed; skipping.\n", sIndexName ); fflush ( stdout ); } return false; } if ( !g_bQuiet ) { fprintf ( stdout, "indexing index '%s'...\n", sIndexName ); fflush ( stdout ); }
// check config if ( !hIndex("path") ) { fprintf ( stdout, "ERROR: index '%s': key 'path' not found.\n", sIndexName ); return false; } if ( ( hIndex.GetInt ( "min_prefix_len", 0 ) > 0 || hIndex.GetInt ( "min_infix_len", 0 ) > 0 ) && hIndex.GetInt ( "enable_star" ) == 0 ) { const char * szMorph = hIndex.GetStr ( "morphology", "" ); if ( szMorph && *szMorph && strcmp ( szMorph, "none" ) ) { fprintf ( stdout, "ERROR: index '%s': infixes and morphology are enabled, enable_star=0\n", sIndexName ); return false; } }
/////////////////// // spawn tokenizer /////////////////// CSphString sError; CSphTokenizerSettings tTokSettings; if ( !sphConfTokenizer ( hIndex, tTokSettings, sError ) ) sphDie ( "index '%s': %s", sIndexName, sError.cstr() ); ISphTokenizer * pTokenizer = ISphTokenizer::Create ( tTokSettings, sError ); if ( !pTokenizer ) sphDie ( "index '%s': %s", sIndexName, sError.cstr() ); CSphDict * pDict = NULL; CSphDictSettings tDictSettings; if ( !g_sBuildStops ) { ISphTokenizer * pTokenFilter = NULL; sphConfDictionary ( hIndex, tDictSettings ); pDict = sphCreateDictionaryCRC ( tDictSettings, pTokenizer, sError ); if ( !pDict ) sphDie ( "index '%s': %s", sIndexName, sError.cstr() ); if ( !sError.IsEmpty () ) fprintf ( stdout, "WARNING: index '%s': %s\n", sIndexName, sError.cstr() ); pTokenFilter = ISphTokenizer::CreateTokenFilter ( pTokenizer, pDict->GetMultiWordforms () ); pTokenizer = pTokenFilter ? pTokenFilter : pTokenizer; }
// prefix/infix indexing int iPrefix = hIndex("min_prefix_len") ? hIndex["min_prefix_len"].intval() : 0; int iInfix = hIndex("min_infix_len") ? hIndex["min_infix_len"].intval() : 0; iPrefix = Max ( iPrefix, 0 ); iInfix = Max ( iInfix, 0 ); CSphString sPrefixFields, sInfixFields; if ( hIndex.Exists ( "prefix_fields" ) ) sPrefixFields = hIndex ["prefix_fields"].cstr (); if ( hIndex.Exists ( "infix_fields" ) ) sInfixFields = hIndex ["infix_fields"].cstr (); if ( iPrefix == 0 && !sPrefixFields.IsEmpty () ) fprintf ( stdout, "WARNING: min_prefix_len = 0. prefix_fields are ignored\n" ); if ( iInfix == 0 && !sInfixFields.IsEmpty () ) fprintf ( stdout, "WARNING: min_infix_len = 0. infix_fields are ignored\n" );
// boundary bool bInplaceEnable = hIndex.GetInt ( "inplace_enable", 0 ) != 0; int iHitGap = hIndex.GetSize ( "inplace_hit_gap", 0 ); int iDocinfoGap = hIndex.GetSize ( "inplace_docinfo_gap", 0 ); float fRelocFactor = hIndex.GetFloat ( "inplace_reloc_factor", 0.1f ); float fWriteFactor = hIndex.GetFloat ( "inplace_write_factor", 0.1f ); if ( bInplaceEnable ) { if ( fRelocFactor < 0.01f || fRelocFactor > 0.9f ) { fprintf ( stdout, "WARNING: inplace_reloc_factor must be 0.01 to 0.9, clamped\n" ); fRelocFactor = Min ( Max ( fRelocFactor, 0.01f ), 0.9f ); } if ( fWriteFactor < 0.01f || fWriteFactor > 0.9f ) { fprintf ( stdout, "WARNING: inplace_write_factor must be 0.01 to 0.9, clamped\n" ); fWriteFactor = Min ( Max ( fWriteFactor, 0.01f ), 0.9f ); } if ( fWriteFactor+fRelocFactor > 1.0f ) { fprintf ( stdout, "WARNING: inplace_write_factor+inplace_reloc_factor must be less than 0.9, scaled\n" ); float fScale = 0.9f/(fWriteFactor+fRelocFactor); fRelocFactor *= fScale; fWriteFactor *= fScale; } }
接下来准备数据源,其实发现Indexer在准备这些工作时很繁琐,一遍又一遍的检查相关配置信息是否完全,前面检查了后面还查,可能是出于严谨的考虑吧,这里提一下dSource是一个CSphSource的数组,每一个CSphSource类型的pSource对应一个数据源,因为配置信息中可能会存在多个数据源,所以会有多个pSource。程序会在hIndex中搜索Key值为Source的键值对,提取出对应的值作为pSourceName ,在本例中,我们只有配置文件中的一个Source即mysql。我们看一下CSphSource类型结构。其中包含有三个大部分,第一大部分存储文本分词后的word信息,每一个word(也许是字也许是词)对应一个WordHit,这个WordHit描述该word的相关信息,唯一标示该word。其中WordHit中又包含三部分,分别为word的文档ID,表示该word属于哪一篇文档;word的ID,表示该word在字典中的对应ID;Word的位置,表示该word在文档中的偏移量。第二大部分存储Source中文档的相关信息,其中亦包含了三部分,分别问文档ID;文档中列的数目,以及列对应的指针。第三大部分存储的就是doc中的属性字段信息。
/// generic data source class CSphSource : public CSphSourceSettings { public: CSphVectorm_dHits; ///< current document split into words CSphDocInfo m_tDocInfo; ///< current document info CSphVector m_dStrAttrs;///< current document string attrs
// parse all sources CSphVectordSources; bool bGotAttrs = false; bool bSpawnFailed = false; for ( CSphVariant * pSourceName = hIndex("source"); pSourceName; pSourceName = pSourceName->m_pNext ) { if ( !hSources ( pSourceName->cstr() ) ) { fprintf ( stdout, "ERROR: index '%s': source '%s' not found.\n", sIndexName, pSourceName->cstr() ); continue; } const CSphConfigSection & hSource = hSources [ pSourceName->cstr() ]; CSphSource * pSource = SpawnSource ( hSource, pSourceName->cstr(), pTokenizer->IsUtf8 () );//通过SpawnSource完成对于数据源的解析,其中包括了属性列,需要构建索引列等相关信息 if ( !pSource ) { bSpawnFailed = true; continue; } if ( pSource->HasAttrsConfigured() ) bGotAttrs = true;//判断数据源中是否有指定属性项 pSource->SetupFieldMatch ( sPrefixFields.cstr (), sInfixFields.cstr () ); pSource->SetTokenizer ( pTokenizer );//为每一个Source准备一个分词器 dSources.Add ( pSource );//将解析好的某个Source加入Source数组中去,因为可能存在多个Source }
// do index CSphIndex * pIndex = sphCreateIndexPhrase ( sIndexPath.cstr() ); assert ( pIndex ); // check lock file if ( !pIndex->Lock() ) { fprintf ( stdout, "FATAL: %s, will not index. Try --rotate option.\n", pIndex->GetLastError().cstr() ); exit ( 1 ); } CSphIndexSettings tSettings; sphConfIndex ( hIndex, tSettings ); if ( tSettings.m_bIndexExactWords && !tDictSettings.HasMorphology () ) { tSettings.m_bIndexExactWords = false; fprintf ( stdout, "WARNING: index '%s': no morphology, index_exact_words=1 has no effect, ignoring\n", sIndexName ); } if ( bGotAttrs && tSettings.m_eDocinfo==SPH_DOCINFO_NONE ) { fprintf ( stdout, "FATAL: index '%s': got attributes, but docinfo is 'none' (fix your config file).\n", sIndexName ); exit ( 1 ); } pIndex->SetProgressCallback ( ShowProgress ); if ( bInplaceEnable ) pIndex->SetInplaceSettings ( iHitGap, iDocinfoGap, fRelocFactor, fWriteFactor ); pIndex->SetTokenizer ( pTokenizer ); pIndex->SetDictionary ( pDict ); pIndex->Setup ( tSettings ); bOK = pIndex->Build ( dSources, g_iMemLimit, g_iWriteBuffer )!=0;//Build函数是索引构建的重点,所有的核心操作都在其中 if ( bOK && g_bRotate ) { sIndexPath.SetSprintf ( "%s.new", hIndex["path"].cstr() ); bOK = pIndex->Rename ( sIndexPath.cstr() ); } if ( !bOK ) fprintf ( stdout, "ERROR: index '%s': %s.\n", sIndexName, pIndex->GetLastError().cstr() ); pIndex->Unlock ();
//sphinx.cpp Build ( const CSphVector
// setup sources ARRAY_FOREACH ( iSource, dSources ) { CSphSource * pSource = dSources[iSource]; assert ( pSource ); pSource->SetDict ( m_pDict ); pSource->Setup ( m_tSettings ); }
// connect 1st source and fetch its schema if ( !dSources[0]->Connect ( m_sLastError ) || !dSources[0]->IterateHitsStart ( m_sLastError ) || !dSources[0]->UpdateSchema ( &m_tSchema, m_sLastError ) ) { return 0; }
// create temp files CSphAutofile fdLock ( GetIndexFileName("tmp0"), SPH_O_NEW, m_sLastError, true ); CSphAutofile fdHits ( GetIndexFileName ( m_bInplaceSettings ? "spp" : "tmp1" ), SPH_O_NEW, m_sLastError, !m_bInplaceSettings ); CSphAutofile fdDocinfos ( GetIndexFileName ( m_bInplaceSettings ? "spa" : "tmp2" ), SPH_O_NEW, m_sLastError, !m_bInplaceSettings ); CSphAutofile fdTmpFieldMVAs ( GetIndexFileName("tmp7"), SPH_O_NEW, m_sLastError, true ); CSphWriter tOrdWriter; CSphString sRawOrdinalsFile = GetIndexFileName("tmp4");
// fetch documents for ( ;; ) { // get next doc, and handle errors if ( !pSource->IterateHitsNext ( m_sLastError ) ) return 0;
bool CSphSource_Document::IterateHitsNext ( CSphString & sError ) { assert ( m_pTokenizer ); PROFILE ( src_document ); BYTE ** dFields = NextDocument ( sError );//从数据源中提取需要构建索引的列 if ( m_tDocInfo.m_iDocID==0 ) return true; if ( !dFields ) return false; m_tStats.m_iTotalDocuments++; m_dHits.Reserve ( 1024 ); m_dHits.Resize ( 0 ); BuildHits ( dFields, -1, 0 );//针对提取出的需要索引的列构建索引 return true; }
// get next non-zero-id row do { // try to get next row bool bGotRow = SqlFetchRow ();//首先尝试能否正常取出一条记录 // when the party's over... while ( !bGotRow )//如果取不出来这条记录,再继续思考原因 { // is that an error? if ( SqlIsError() ) { sError.SetSprintf ( "sql_fetch_row: %s", SqlError() ); m_tDocInfo.m_iDocID = 1; // 0 means legal eof return NULL; } // maybe we can do next step yet? if ( !RunQueryStep ( m_tParams.m_sQuery.cstr(), sError ) ) { // if there's a message, there's an error // otherwise, we're just over if ( !sError.IsEmpty() ) { m_tDocInfo.m_iDocID = 1; // 0 means legal eof return NULL; } } else { // step went fine; try to fetch bGotRow = SqlFetchRow (); continue; } SqlDismi***esult (); // ok, we're over ARRAY_FOREACH ( i, m_tParams.m_dQueryPost ) { if ( !SqlQuery ( m_tParams.m_dQueryPost[i].cstr() ) ) { sphWarn ( "sql_query_post[%d]: error=%s, query=%s", i, SqlError(), m_tParams.m_dQueryPost[i].cstr() ); break; } SqlDismi***esult (); } m_tDocInfo.m_iDocID = 0; // 0 means legal eof return NULL; } // get him!//成功取得后 m_tDocInfo.m_iDocID = VerifyID ( sphToDocid ( SqlColumn(0) ) );//判断ID是否为0,是否越界 m_uMaxFetchedID = Max ( m_uMaxFetchedID, m_tDocInfo.m_iDocID ); } while ( !m_tDocInfo.m_iDocID );
ARRAY_FOREACH ( i, m_tSchema.m_dFields ) { #if USE_ZLIB if ( m_dUnpack[i] != SPH_UNPACK_NONE ) { m_dFields[i] = (BYTE*) SqlUnpackColumn ( i, m_dUnpack[i] ); continue; } #endif m_dFields[i] = (BYTE*) SqlColumn ( m_tSchema.m_dFields[i].m_iIndex ); } int iFieldMVA = 0; for ( int i=0; i提取出相关数据后,针对每一条需要索引的item开始构建索引,进入BuildHit函数,首先先初始化相关参数,准备分词器缓存
ARRAY_FOREACH ( iField, m_tSchema.m_dFields ) { //BYTE * sField = dFields[iField]; BYTE * sField = GetField(dFields, iField);//取出索引字段 if ( !sField ) continue; if ( m_bStripHTML ) m_pStripper->Strip ( sField ); int iFieldBytes = (int) strlen ( (char*)sField ); m_tStats.m_iTotalBytes += iFieldBytes; m_pTokenizer->SetBuffer ( sField, iFieldBytes );//设置分词器缓存,实际上就是索引字段大小,准备针对索引字段进行分词 BYTE * sWord; int iPos = HIT_PACK(iField,0); int iLastStep = 1; bool bPrefixField = m_tSchema.m_dFields[iField].m_eWordpart == SPH_WORDPART_PREFIX; bool bInfixMode = m_iMinInfixLen > 0; BYTE sBuf [ 16+3*SPH_MAX_WORD_LEN ];然后开始分词,分词的过程在这里不具体讲了,这不属于Sphinx的主要涉足领域,当我们把iField即要索引的字段放入分词器中依次解析,然后将分出的词赋值给sWord,将sWord的位置计算后赋值给ipos
// index words only while ( ( sWord = m_pTokenizer->GetToken() )!=NULL ) { iPos += iLastStep + m_pTokenizer->GetOvershortCount()*m_iOvershortStep; if ( m_pTokenizer->GetBoundary() ) iPos = Max ( iPos+m_iBoundaryStep, 1 ); iLastStep = 1; if ( bGlobalPartialMatch ) { int iBytes = strlen ( (const char*)sWord ); memcpy ( sBuf + 1, sWord, iBytes ); sBuf [0] = MAGIC_WORD_HEAD; sBuf [iBytes + 1] = '\0'; SphWordID_t iWord = m_pDict->GetWordIDWithMarkers ( sBuf ); if ( iWord ) { CSphWordHit & tHit = m_dHits.Add (); tHit.m_iDocID = m_tDocInfo.m_iDocID; tHit.m_iWordID = iWord; tHit.m_iWordPos = iPos; } }将分词后的sWord去词典中查找它对应的词ID,这样我们就收集全了这个词的所有详细信息,创建一个类型为CSphWordHit类型的tHit,其中存储了该sWord所在的DocID,在词典中对应的词ID,以及在文档中词的位置信息Pos
SphWordID_t iWord = m_pDict->GetWordID ( sWord ); if ( iWord ) { CSphWordHit & tHit = m_dHits.Add ();//将tHit放入dHit中去 tHit.m_iDocID = m_tDocInfo.m_iDocID; tHit.m_iWordID = iWord; tHit.m_iWordPos = iPos; } else { iLastStep = m_iStopwordStep; }处理完该词后,如果是中文的话还会进一步去判断其是否有近义词出现,其主要的函数为GetThesaurus,这里要简单说明下采用的MMSEG分词法,比如我们分词得到了中华,那么它还会继续从词典中去找是否存在其扩展词段(这里姑且翻译成近义词)如中华人民,×××,然后也会把他也存入进去(对于MMSEG的中文分词方法还有待进一步研究,这我只能照着代码念了),最后将所有的sWord的信息tHit都放入到m_dHits中去,形成我们的词索引spp索引
// zh_cn only GetThesaurus { int iBytes = strlen ( (const char*)sWord ); const BYTE* tbuf_ptr = m_pTokenizer->GetThesaurus(sWord, iBytes); if(tbuf_ptr) { while(*tbuf_ptr) { size_t len = strlen((const char*)tbuf_ptr); SphWordID_t iWord = m_pDict->GetWordID ( tbuf_ptr ,len , true); if ( iWord ) { CSphWordHit & tHit = m_dHits.Add (); tHit.m_iDocID = m_tDocInfo.m_iDocID; tHit.m_iWordID = iWord; tHit.m_iWordPos = iPos; //tHit.m_iBytePos = iBytePos; //tHit.m_iByteLen = iByteLen; //iLastStep = m_pTokenizer->TokenIsBlended() ? 0 : 1; //needs move this? } tbuf_ptr += len + 1; //move next } } //end if buf }//end GetThesaurus当该iField索引字段全部都索引完成后,在dHit中添加结束标记
// mark trailing hit if ( m_dHits.GetLength() ) m_dHits.Last().m_iWordPos |= HIT_FIELD_END;